Skip to content

Implement stream command for training job#7939

Merged
saanikaguptamicrosoft merged 11 commits into
Azure:foundry-training-devfrom
saanikaguptamicrosoft:saanika/stream2
Apr 29, 2026
Merged

Implement stream command for training job#7939
saanikaguptamicrosoft merged 11 commits into
Azure:foundry-training-devfrom
saanikaguptamicrosoft:saanika/stream2

Conversation

@saanikaguptamicrosoft
Copy link
Copy Markdown
Collaborator

@saanikaguptamicrosoft saanikaguptamicrosoft commented Apr 28, 2026

Approach

Log file selection — Matches the Azure ML SDK pattern:

  • Primary: user_logs/std_log[\D]*[0]*(?:_ps)?\.txt (Common Runtime user logs — stdout/stderr from the training script)
  • Fallback: azureml-logs/[\d]{2}.+\.txt (legacy compute targets)
  • Activity/system logs are excluded — they use non-append-only blobs that cause streaming artifacts

Polling & incremental output — Each poll cycle:

  • GET /jobs/{name} to check job status and extract the Tracking service endpoint
  • GET /history/v1.0/{workspace}/runs/{runId}/details (AML History API) to get log file SAS URIs
  • Full download of each matched log file via SAS URI
  • Line-count delta: skip already-printed lines, print only new ones

Poll interval — Sigmoid curve from 2s → 60s based on elapsed wall-clock time, matching the Azure ML SDK:

This keeps early streaming responsive (2s) while reducing API load for long-running jobs.

Multi-file handling — For distributed/multi-node jobs with multiple log files:

  • Files are processed in alphabetical order
  • A multiFile flag latches true once >1 file is seen, enabling per-file headers on every poll for clarity
  • Single-file jobs get a clean, header-once experience

Terminal state handling — If the job is already completed when stream starts, it skips directly to the execution summary without downloading logs. If the job completes mid-stream, one final flush poll captures remaining output before printing the summary.

  • Terminal states — Completed, Failed, Canceled, NotResponding, Paused → final log flush + execution summary
  • Active states — NotStarted, Queued, Preparing, Provisioning, Starting, Running, Finalizing → continue polling

Execution summary — On completion, prints RunId, Status, and Studio Web View URL.

Testing

  • azd x build
  • UTs: go test ./internal/service/ ./pkg/client/ -v -count=1

Help command
image

Missing required field name
image

Running stream for a job already in terminal state
image

Streaming of running job
image

Copy link
Copy Markdown
Contributor

@wbreza wbreza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: PR #7939 — Implement stream command for training job

TL;DR

Adds a job stream subcommand to the azure.ai.customtraining extension that streams real-time training job logs via polling with sigmoid-based adaptive intervals. Good feature with smart polling design, but has a potential build issue, no tests, and several error handling gaps.


🔴 Critical (1)

1. Undefined rootFlags — potential build failure

  • File: internal/cmd/job_stream.go (line 88)
  • Issue: References rootFlags.Debug but rootFlags is not defined in this file. If it's not a package-level global in another file in this package (e.g., root.go), this won't compile.
  • Suggested Fix: Define rootFlags in root.go (matching azure.ai.models pattern) or pass the debug flag through parameters.

🟠 High (2)

2. Zero test coverage for 507 lines of new code

  • Files: All 7 new/modified files
  • Issue: No _test.go files for any new code. Key untested paths: polling loop, sigmoid interval, filterLogFiles, parseTrackingEndpoint, GetBlobContent, retry logic, context cancellation.
  • Suggested Fix: Add stream_service_test.go with table-driven tests for the streaming lifecycle, retry thresholds, sigmoid boundaries, URL parsing, and log filtering. Use testify/mock for client dependencies.

3. Swallowed errors — silent data loss in log streaming

  • File: stream_service.go (lines 343-346, 382)
  • Issue: (a) flushLogs discards all errors: _, _, _ = s.pollAndPrintLogs(...). (b) Blob read errors are silently skipped with continue. Users get incomplete logs with no warning.
  • Suggested Fix: Log warnings on error so users know logs may be incomplete:
// flushLogs
_, _, err := s.pollAndPrintLogs(ctx, trackingEndpoint, jobName, processedLines, multiFile)
if err != nil {
    fmt.Fprintf(os.Stderr, "Warning: failed to flush final logs: %v\n", err)
}

🟡 Medium (7)

4. SSRF risk — SAS URIs used without domain validation

  • File: pkg/client/blob.go
  • Issue: GetBlobContent makes HTTP requests to SAS URIs from the API response without validating the domain. A compromised response could redirect to attacker-controlled endpoints.
  • Suggested Fix: Validate the parsed URL hostname ends with .blob.core.windows.net (or sovereign cloud variants).

5. Full blob re-downloaded on every poll

  • Files: blob.go, stream_service.go
  • Issue: Every poll downloads entire blob and re-splits all lines, printing only new ones. O(n²) network I/O. Consider HTTP Range headers to fetch only new content.

6. Context cancellation not honored during sleep

  • File: stream_service.go
  • Issue: time.Sleep() doesn't respond to context cancellation. Ctrl+C may take up to 60s. Use select { case <-ctx.Done(): ... case <-time.After(...): } instead.

7. No polling loop timeout

  • File: stream_service.go
  • Issue: Bare for {} loop runs indefinitely for stuck jobs. Add context.WithTimeout.

8. Missing HTTP request timeouts

  • Files: blob.go, history.go
  • Issue: No explicit request-level timeout on HTTP calls. Individual requests can hang indefinitely.

9. SAS tokens in debug logs

  • Files: client.go, history.go
  • Issue: Debug logging prints full URLs with potential SAS tokens in query params. Redact sensitive parameters before logging.

10. 1MB blob limit may truncate logs silently

  • File: blob.go
  • Issue: io.LimitReader(resp.Body, 1<<20) with no truncation warning. Verbose training logs may be cut off.

🟢 Low (8)

  1. Missing structured logging (stderr is adequate for CLI, but trace IDs would help debugging)
  2. StreamJobLogs at 79 lines — consider extracting helper methods
  3. Bare error returns at job_stream.go:93 and history.go:503 — wrap with fmt.Errorf
  4. Sigmoid formula hardcodes 60.0 — use maxInterval.Seconds() for maintainability
  5. Retry counter mixes job-status and log-streaming errors into one consecutiveErrs
  6. Job name validation only checks empty — consider MarkFlagRequired and length limits
  7. Blob client error handling inconsistent with history client (HandleError())
  8. processedLines map not synchronized — document single-threaded constraint

✅ What Looks Good

  • Sigmoid adaptive polling — smart approach to reduce API load for long-running jobs
  • Terminal state detection — properly skips streaming for completed/failed/canceled jobs
  • Defensive nil checks — prevents panics on missing data
  • 1MB blob limit — prevents memory exhaustion
  • No breaking changes — all changes are additive

Summary

Priority Count
🔴 Critical 1
🟠 High 2
🟡 Medium 7
🟢 Low 8
Total 18

@saanikaguptamicrosoft
Copy link
Copy Markdown
Collaborator Author

saanikaguptamicrosoft commented Apr 29, 2026

1. Undefined rootFlags — potential build failure

  • File: internal/cmd/job_stream.go (line 88)
  • Issue: References rootFlags.Debug but rootFlags is not defined in this file. If it's not a package-level global in another file in this package (e.g., root.go), this won't compile.
  • Suggested Fix: Define rootFlags in root.go (matching azure.ai.models pattern) or pass the debug flag through parameters.

False positive, build is successful and root.go already defines rootFlags

2. Zero test coverage for 507 lines of new code

  • Files: All 7 new/modified files
  • Issue: No _test.go files for any new code. Key untested paths: polling loop, sigmoid interval, filterLogFiles, parseTrackingEndpoint, GetBlobContent, retry logic, context cancellation.
  • Suggested Fix: Add stream_service_test.go with table-driven tests for the streaming lifecycle, retry thresholds, sigmoid boundaries, URL parsing, and log filtering. Use testify/mock for client dependencies.

Added tests

3. Swallowed errors — silent data loss in log streaming

  • File: stream_service.go (lines 343-346, 382)
  • Issue: (a) flushLogs discards all errors: _, _, _ = s.pollAndPrintLogs(...). (b) Blob read errors are silently skipped with continue. Users get incomplete logs with no warning.
  • Suggested Fix: Log warnings on error so users know logs may be incomplete:
// flushLogs
_, _, err := s.pollAndPrintLogs(ctx, trackingEndpoint, jobName, processedLines, multiFile)
if err != nil {
    fmt.Fprintf(os.Stderr, "Warning: failed to flush final logs: %v\n", err)
}

Makes sense, updated

4. SSRF risk — SAS URIs used without domain validation

  • File: pkg/client/blob.go
  • Issue: GetBlobContent makes HTTP requests to SAS URIs from the API response without validating the domain. A compromised response could redirect to attacker-controlled endpoints.
  • Suggested Fix: Validate the parsed URL hostname ends with .blob.core.windows.net (or sovereign cloud variants).

Risk is low since URIs come from authenticated Azure API. Don't want to hard-code any validations here.

5. Full blob re-downloaded on every poll

  • Files: blob.go, stream_service.go
  • Issue: Every poll downloads entire blob and re-splits all lines, printing only new ones. O(n²) network I/O. Consider HTTP Range headers to fetch only new content.
image

6. Context cancellation not honored during sleep

  • File: stream_service.go
  • Issue: time.Sleep() doesn't respond to context cancellation. Ctrl+C may take up to 60s. Use select { case <-ctx.Done(): ... case <-time.After(...): } instead.

Good point, added

7. No polling loop timeout

  • File: stream_service.go
  • Issue: Bare for {} loop runs indefinitely for stuck jobs. Add context.WithTimeout.

Training jobs can run for hours/days. A hard timeout would kill legitimate streams. User has Ctrl+C.

8. Missing HTTP request timeouts

  • Files: blob.go, history.go
  • Issue: No explicit request-level timeout on HTTP calls. Individual requests can hang indefinitely.

Already handeled

9. SAS tokens in debug logs

  • Files: client.go, history.go
  • Issue: Debug logging prints full URLs with potential SAS tokens in query params. Redact sensitive parameters before logging.

SAS URIs aren't logged. Debug prints are only on the AML history API URL (bearer auth, no SAS). Blob.go makes its own request without debug logging.

10. 1MB blob limit may truncate logs silently

  • File: blob.go
  • Issue: io.LimitReader(resp.Body, 1<<20) with no truncation warning. Verbose training logs may be cut off.

Unlikely in practice for training logs, and hard to detect cleanly with LimitReader. Low priority.

@saanikaguptamicrosoft saanikaguptamicrosoft merged commit 4cbb9c0 into Azure:foundry-training-dev Apr 29, 2026
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants