Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions acceptance/experimental/air/cancel/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions acceptance/experimental/air/cancel/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

=== cancel by id (text)
>>> [CLI] experimental air cancel 123
Successfully requested cancellation for run 123

=== cancel by id (json)
>>> [CLI] experimental air cancel 123 -o json
{
"v": 1,
"ts": "[TIMESTAMP]",
"data": {
"cancelled": [
"123"
]
}
}

=== cancel multiple ids
>>> [CLI] experimental air cancel 123 456
Successfully requested cancellation for run 123
Successfully requested cancellation for run 456
Successfully requested cancellation for 2 run(s).

=== cancel --all
>>> [CLI] experimental air cancel --all -y
Searching active runs for [USERNAME] in [DATABRICKS_URL]...
Successfully requested cancellation for run [NUMID]
Successfully requested cancellation for run [NUMID]
Successfully requested cancellation for 2 run(s).
11 changes: 11 additions & 0 deletions acceptance/experimental/air/cancel/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title "cancel by id (text)"
trace $CLI experimental air cancel 123

title "cancel by id (json)"
trace $CLI experimental air cancel 123 -o json

title "cancel multiple ids"
trace $CLI experimental air cancel 123 456

title "cancel --all"
trace $CLI experimental air cancel --all -y
53 changes: 53 additions & 0 deletions acceptance/experimental/air/cancel/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# This command does not deploy a bundle, so no engine matrix is needed.
[EnvMatrix]
DATABRICKS_BUNDLE_ENGINE = []

# The SDK occasionally probes host reachability with a HEAD request; stub it so
# the test is deterministic.
[[Server]]
Pattern = "HEAD /"
Response.Body = ''

# CancelRun accepts the request and returns an empty body.
[[Server]]
Pattern = "POST /api/2.2/jobs/runs/cancel"
Response.Body = '{}'

# Jobs runs/list backs `cancel --all`: two active AIR runs for the current user
# (tester@databricks.com, from the built-in scim/v2/Me handler).
[[Server]]
Pattern = "GET /api/2.2/jobs/runs/list"
Response.Body = '''
{
"runs": [
{
"run_id": 334747067049496,
"run_name": "qwen-train",
"creator_user_name": "tester@databricks.com",
"start_time": 1717608759000,
"state": {"life_cycle_state": "RUNNING"},
"tasks": [{
"run_id": 334747067049497,
"ai_runtime_task": {
"experiment": "/Users/tester@databricks.com/qwen-train",
"deployments": [{"compute": {"accelerator_type": "GPU_1xA10", "accelerator_count": 1}}]
}
}]
},
{
"run_id": 566001814929041,
"run_name": "llama-train",
"creator_user_name": "tester@databricks.com",
"start_time": 1717612404000,
"state": {"life_cycle_state": "RUNNING"},
"tasks": [{
"run_id": 566001814929042,
"ai_runtime_task": {
"experiment": "/Users/tester@databricks.com/llama-train",
"deployments": [{"compute": {"accelerator_type": "GPU_1xA10", "accelerator_count": 1}}]
}
}]
}
]
}
'''
6 changes: 0 additions & 6 deletions acceptance/experimental/air/unimplemented/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ Error: `air logs` is not implemented yet

Exit code: 1

=== cancel
>>> [CLI] experimental air cancel 123
Error: `air cancel` is not implemented yet

Exit code: 1

=== register-image
>>> [CLI] experimental air register-image my-image:latest
Error: `air register-image` is not implemented yet
Expand Down
3 changes: 0 additions & 3 deletions acceptance/experimental/air/unimplemented/script
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@
title "logs"
errcode trace $CLI experimental air logs 123

title "cancel"
errcode trace $CLI experimental air cancel 123

title "register-image"
errcode trace $CLI experimental air register-image my-image:latest
189 changes: 186 additions & 3 deletions experimental/air/cmd/cancel.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
package aircmd

import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"text/tabwriter"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdctx"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/flags"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/spf13/cobra"
)

// cancelData is the JSON payload printed by `air cancel`. `all` is set only for
// --all, `workspace` only when --all finds no active runs, and `failed` only
// when a run could not be cancelled.
type cancelData struct {
Cancelled []string `json:"cancelled"`
All bool `json:"all,omitempty"`
Workspace string `json:"workspace,omitempty"`
Failed []cancelFailure `json:"failed,omitempty"`
}

type cancelFailure struct {
RunID string `json:"run_id"`
Error string `json:"error"`
}

func newCancelCommand() *cobra.Command {
var (
all bool
Expand All @@ -15,9 +45,6 @@ func newCancelCommand() *cobra.Command {
Use: "cancel [JOB_RUN_ID...]",
Short: "Cancel one or more runs",
Long: `Cancel one or more runs by ID, or cancel all of your active runs with --all.`,
RunE: func(cmd *cobra.Command, args []string) error {
return notImplemented("cancel")
},
}

cmd.Flags().BoolVar(&all, "all", false, "Cancel all of your active runs")
Expand All @@ -35,5 +62,161 @@ func newCancelCommand() *cobra.Command {
return nil
}

// In -o json mode an auth failure should be a JSON error envelope, not a bare
// error. ErrAlreadyPrinted passes through (already handled upstream).
cmd.PreRunE = func(cmd *cobra.Command, args []string) error {
err := root.MustWorkspaceClient(cmd, args)
if err == nil || errors.Is(err, root.ErrAlreadyPrinted) {
return err
}
return renderError(cmd.Context(), cmd, "INTERNAL_ERROR", "TRANSIENT", true, err)
}

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
w := cmdctx.WorkspaceClient(ctx)
jsonOut := root.OutputType(cmd) == flags.OutputJSON

runIDs := args
data := cancelData{Cancelled: []string{}}

if all {
data.All = true

me, err := w.CurrentUser.Me(ctx, iam.MeRequest{})
if err != nil {
return renderError(ctx, cmd, "INTERNAL_ERROR", "TRANSIENT", true,
fmt.Errorf("failed to resolve current user: %w", err))
}
host := strings.TrimRight(w.Config.Host, "/")

if !jsonOut {
cmdio.LogString(ctx, fmt.Sprintf("Searching active runs for %s in %s...", me.UserName, host))
}

// Fetch every active run (up to the scan bound) so --all cancels all
// of them, not just the first page.
fetcher := newRunFetcher(ctx, w, listQuery{activeOnly: true, userFilter: me.UserName})
rows, err := fetcher.next(maxListScan)
if err != nil {
return renderError(ctx, cmd, "INTERNAL_ERROR", "TRANSIENT", true,
fmt.Errorf("failed to list active runs: %w", err))
}

runIDs = make([]string, 0, len(rows))
for i := range rows {
if rows[i].RunID != "" {
runIDs = append(runIDs, rows[i].RunID)
}
}

if len(runIDs) == 0 {
if jsonOut {
data.Workspace = host
return renderEnvelope(ctx, data)
}
cmdio.LogString(ctx, "No active runs found.")
return nil
}

if !yes {
displayCancelPreview(ctx, rows, host)
confirmed, err := cmdio.AskYesOrNo(ctx, fmt.Sprintf("\nCancel %d run(s) in %s?", len(runIDs), host))
if err != nil {
return err
}
if !confirmed {
cmdio.LogString(ctx, "Cancellation aborted.")
return root.ErrAlreadyPrinted
}
}
}

for _, rid := range runIDs {
err := cancelRun(ctx, w, rid)
if err != nil {
data.Failed = append(data.Failed, cancelFailure{RunID: rid, Error: err.Error()})
if !jsonOut {
if runNotFound(err) {
cmdio.LogString(ctx, fmt.Sprintf("Run %s not found. Please check the run ID and ensure you're using a Job Run ID.", rid))
} else {
cmdio.LogString(ctx, fmt.Sprintf("Failed to cancel run %s: %s", rid, err))
}
}
continue
}
data.Cancelled = append(data.Cancelled, rid)
if !jsonOut {
cmdio.LogString(ctx, "Successfully requested cancellation for run "+rid)
}
}

if jsonOut {
if err := renderEnvelope(ctx, data); err != nil {
return err
}
// Print the envelope, but still exit non-zero on any failure.
if len(data.Failed) > 0 {
return root.ErrAlreadyPrinted
}
return nil
}

if len(data.Failed) > 0 {
cmdio.LogString(ctx, fmt.Sprintf("%d run(s) failed to cancel.", len(data.Failed)))
return root.ErrAlreadyPrinted
}
if all || len(data.Cancelled) > 1 {
cmdio.LogString(ctx, fmt.Sprintf("Successfully requested cancellation for %d run(s).", len(data.Cancelled)))
}
return nil
}

return cmd
}

// runNotFound reports whether err means the run does not exist. The cancel
// endpoint returns 400 INVALID_PARAMETER_VALUE ("Run <id> does not exist") for
// an unknown run, and the SDK only remaps that to ErrResourceDoesNotExist for
// the runs/get path, not cancel — so we also detect the raw code here.
func runNotFound(err error) bool {
if errors.Is(err, apierr.ErrResourceDoesNotExist) {
return true
}
if apiErr, ok := errors.AsType[*apierr.APIError](err); ok {
return apiErr.StatusCode == http.StatusBadRequest && apiErr.ErrorCode == "INVALID_PARAMETER_VALUE"
}
return false
}

// cancelRun requests cancellation of a single job run. The cancel is async, so
// the returned waiter is ignored.
func cancelRun(ctx context.Context, w *databricks.WorkspaceClient, rid string) error {
runID, err := strconv.ParseInt(rid, 10, 64)
if err != nil || runID <= 0 {
return fmt.Errorf("invalid run ID %q: must be a positive integer", rid)
}
_, err = w.Jobs.CancelRun(ctx, jobs.CancelRun{RunId: runID})
return err
}

// displayCancelPreview shows the runs that `cancel --all` is about to terminate.
func displayCancelPreview(ctx context.Context, rows []listRow, host string) {
var sb strings.Builder
fmt.Fprintf(&sb, "\nWorkspace: %s\n", host)
fmt.Fprintf(&sb, "Found %d active run(s) to cancel:\n\n", len(rows))

tw := tabwriter.NewWriter(&sb, 0, 0, 2, ' ', 0)
fmt.Fprintln(tw, "Run ID\tExperiment\tStarted")
for i := range rows {
experiment := orNA(rows[i].Experiment)
started := na
if rows[i].StartedAt != nil {
started = *rows[i].StartedAt
}
fmt.Fprintf(tw, "%s\t%s\t%s\n", rows[i].RunID, experiment, started)
}
tw.Flush()

cmdio.LogString(ctx, strings.TrimRight(sb.String(), "\n"))
}
Loading
Loading