Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 89 additions & 50 deletions experimental/air/cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ type listedRun struct {
taskRunID int64
}

// listQuery holds the resolved inputs to listAirRuns.
// listQuery holds the resolved inputs to a runFetcher.
type listQuery struct {
activeOnly bool
allUsers bool
userFilter string
filters listFilters
limit int
fetchMLflow bool
}

Expand Down Expand Up @@ -113,34 +111,47 @@ func newListCommand() *cobra.Command {
userFilter = me.UserName
}

rows, err := listAirRuns(ctx, w, listQuery{
fetcher := newRunFetcher(ctx, w, listQuery{
activeOnly: active,
allUsers: allUsers,
userFilter: userFilter,
filters: f,
limit: limit,
fetchMLflow: root.OutputType(cmd) == flags.OutputText,
})
if err != nil {
return err
}

// JSON keeps the air envelope; text renders the table (interactive and
// navigable in a terminal, printed once when piped).
// JSON prints the newest `limit` runs once. Text renders the table:
// navigable in a terminal (paging in older runs on demand), printed once
// when piped.
if root.OutputType(cmd) == flags.OutputJSON {
rows, err := fetcher.next(limit)
if err != nil {
return err
}
warnIfTruncated(ctx, fetcher)
return renderEnvelope(ctx, listData{Rows: rows})
}
return renderListText(cmd, rows)
return renderListText(cmd, fetcher, limit)
}

return cmd
}

// listAirRuns pages through Jobs runs/list, keeps the AIR runs that match the
// user and filters, and stops once it has enough matches or has scanned
// maxListScan runs. Detail comes straight from runs/list (expand_tasks), so no
// per-run calls are needed.
func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery) ([]listRow, error) {
// runFetcher pages Jobs runs/list on demand, yielding AIR runs that match the
// user and filters. It buffers a page's leftover runs so successive next() calls
// resume where the last stopped — driving both one-shot output and lazy paging.
type runFetcher struct {
ctx context.Context
w *databricks.WorkspaceClient
query map[string]any
userFilter string
filters listFilters
fetchMLflow bool

pending []jobRun // runs from the last page not yet inspected
scanned int
exhausted bool
}

func newRunFetcher(ctx context.Context, w *databricks.WorkspaceClient, q listQuery) *runFetcher {
query := map[string]any{
"run_type": "SUBMIT_RUN",
"expand_tasks": true,
Expand All @@ -149,51 +160,55 @@ func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery
if q.activeOnly {
query["active_only"] = true
}
return &runFetcher{
ctx: ctx,
w: w,
query: query,
userFilter: q.userFilter,
filters: q.filters,
fetchMLflow: q.fetchMLflow,
}
}

// next returns up to want more matching rows, paging runs/list (and buffering the
// leftover runs of a page) until it has enough, the server has no more pages, or
// it has scanned maxListScan runs. MLflow links are filled in for text output.
func (f *runFetcher) next(want int) ([]listRow, error) {
var entries []listedRun
scanned := 0
done := false
for !done && scanned < maxListScan {
resp, err := fetchJobRunsPage(ctx, w, query)
if err != nil {
return nil, err
}

for i := range resp.Runs {
run := &resp.Runs[i]
scanned++

if !isAirRun(run) {
continue
}
if q.userFilter != "" && run.CreatorUserName != q.userFilter {
continue
}
if !q.filters.matches(run) {
continue
}

entries = append(entries, listedRun{row: buildListRow(run), taskRunID: taskRunID(run)})
if len(entries) >= q.limit {
done = true
for len(entries) < want {
if len(f.pending) == 0 {
if f.exhausted || f.scanned >= maxListScan {
break
}
if err := f.fetchPage(); err != nil {
return nil, err
}
continue
}

if done || resp.NextPageToken == "" {
break
run := &f.pending[0]
f.pending = f.pending[1:]
f.scanned++

if !isAirRun(run) {
continue
}
query["page_token"] = resp.NextPageToken
if f.userFilter != "" && run.CreatorUserName != f.userFilter {
continue
}
if !f.filters.matches(run) {
continue
}
entries = append(entries, listedRun{row: buildListRow(run), taskRunID: taskRunID(run)})
}

if !done && scanned >= maxListScan {
log.Warnf(ctx, "air list: stopped after scanning %d runs; results may be incomplete", maxListScan)
if f.scanned >= maxListScan {
f.exhausted = true
}

// MLflow links appear only in the text table, so the per-run get-output
// lookups are skipped for JSON output (which omits the column anyway).
if q.fetchMLflow {
setMLflowLinks(ctx, w, entries)
if f.fetchMLflow {
setMLflowLinks(f.ctx, f.w, entries)
}

rows := make([]listRow, len(entries))
Expand All @@ -203,6 +218,30 @@ func listAirRuns(ctx context.Context, w *databricks.WorkspaceClient, q listQuery
return rows, nil
}

// fetchPage loads the next runs/list page into the pending buffer, marking the
// fetcher exhausted once the server reports no further pages.
func (f *runFetcher) fetchPage() error {
resp, err := fetchJobRunsPage(f.ctx, f.w, f.query)
if err != nil {
return err
}
f.pending = resp.Runs
if resp.NextPageToken == "" {
f.exhausted = true
} else {
f.query["page_token"] = resp.NextPageToken
}
return nil
}

// warnIfTruncated logs when the scan hit maxListScan, so one-shot output signals
// its results may be incomplete.
func warnIfTruncated(ctx context.Context, f *runFetcher) {
if f.scanned >= maxListScan {
log.Warnf(ctx, "air list: stopped after scanning %d runs; results may be incomplete", maxListScan)
}
}

// setMLflowLinks fills in each row's MLflow link in parallel, best-effort: a row
// whose IDs can't be resolved keeps its "-" placeholder.
func setMLflowLinks(ctx context.Context, w *databricks.WorkspaceClient, entries []listedRun) {
Expand Down
5 changes: 2 additions & 3 deletions experimental/air/cmd/list_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"time"
)

// buildListRow extracts the columns shown for one run. Optional text columns fall
// back to "-" so the table stays aligned. MLflow links aren't carried by
// runs/list, so the column shows "-".
// buildListRow extracts the columns shown for one run. Optional cells fall back
// to "-"; MLflowURL starts as "-" and setMLflowLinks fills it in for text output.
func buildListRow(run *jobRun) listRow {
experiment := "-"
if e := jobExperiment(run); e != "" {
Expand Down
43 changes: 35 additions & 8 deletions experimental/air/cmd/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ func TestListAirRunsFiltersUserAndType(t *testing.T) {
}
srv := runsServer(t, runsListBody(t, "", runs...))

rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{
rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{
userFilter: "me@example.com",
limit: 10,
})
}).next(10)
require.NoError(t, err)
require.Len(t, rows, 2)
assert.Equal(t, "1", rows[0].RunID)
Expand All @@ -83,10 +82,9 @@ func TestListAirRunsExperimentFilter(t *testing.T) {
}
srv := runsServer(t, runsListBody(t, "", runs...))

rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{
limit: 10,
rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{
filters: listFilters{Experiment: "qwen*"},
})
}).next(10)
require.NoError(t, err)
require.Len(t, rows, 1)
assert.Equal(t, "1", rows[0].RunID)
Expand All @@ -100,7 +98,7 @@ func TestListAirRunsLimitTruncates(t *testing.T) {
}
srv := runsServer(t, runsListBody(t, "", runs...))

rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{limit: 2})
rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{}).next(2)
require.NoError(t, err)
require.Len(t, rows, 2)
assert.Equal(t, "1", rows[0].RunID)
Expand All @@ -112,13 +110,42 @@ func TestListAirRunsPaginates(t *testing.T) {
page2 := runsListBody(t, "", airJobRun(2, "me@example.com", "GPU_1xH100", 1, "exp-b"))
srv := runsServer(t, page1, page2)

rows, err := listAirRuns(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{limit: 10})
rows, err := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{}).next(10)
require.NoError(t, err)
require.Len(t, rows, 2)
assert.Equal(t, "1", rows[0].RunID)
assert.Equal(t, "2", rows[1].RunID)
}

// TestRunFetcherResumesAcrossCalls covers the lazy paging the interactive table
// relies on: a next() that stops mid-page must buffer the rest and hand it back on
// the following call, then report exhaustion — without re-fetching.
func TestRunFetcherResumesAcrossCalls(t *testing.T) {
runs := []jobRun{
airJobRun(1, "me@example.com", "GPU_1xH100", 1, "exp-a"),
airJobRun(2, "me@example.com", "GPU_1xH100", 1, "exp-b"),
airJobRun(3, "me@example.com", "GPU_1xH100", 1, "exp-c"),
}
srv := runsServer(t, runsListBody(t, "", runs...))
f := newRunFetcher(t.Context(), newTestWorkspaceClient(t, srv.URL), listQuery{})

first, err := f.next(2)
require.NoError(t, err)
require.Len(t, first, 2)
assert.Equal(t, "1", first[0].RunID)
assert.Equal(t, "2", first[1].RunID)

second, err := f.next(2)
require.NoError(t, err)
require.Len(t, second, 1) // only the buffered leftover remains
assert.Equal(t, "3", second[0].RunID)
assert.True(t, f.exhausted)

third, err := f.next(2)
require.NoError(t, err)
assert.Empty(t, third)
}

// TestFetchJobRunsParsesAiRuntimeTask pins the raw parse against the real
// runs/get shape, since the typed SDK omits ai_runtime_task.
func TestFetchJobRunsParsesAiRuntimeTask(t *testing.T) {
Expand Down
Loading
Loading