diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 8320d92369..ca3507033a 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -13,6 +13,7 @@ * Make sure warnings asking for approval are understood by agents ([#5239](https://github.com/databricks/cli/pull/5239)) * Support `replace_existing: true` on `postgres_branches` and `postgres_endpoints` so bundles can manage the implicitly-created production branch and primary read-write endpoint of a Lakebase project. * Add `postgres_catalogs` resource to bind a Unity Catalog catalog to a Postgres database on a Lakebase Autoscaling branch ([#5265](https://github.com/databricks/cli/pull/5265)). +* Add `postgres_synced_tables` resource to sync a Unity Catalog Delta table into a Postgres table on a Lakebase Autoscaling branch ([#5268](https://github.com/databricks/cli/pull/5268)). * engine/direct: Changes to state file now persisted to .wal file right away instead of being saved in the end ([#5149](https://github.com/databricks/cli/pull/5149)) ### Dependency updates diff --git a/acceptance/bundle/invariant/configs/postgres_synced_table.yml.tmpl b/acceptance/bundle/invariant/configs/postgres_synced_table.yml.tmpl new file mode 100644 index 0000000000..7a548a0275 --- /dev/null +++ b/acceptance/bundle/invariant/configs/postgres_synced_table.yml.tmpl @@ -0,0 +1,16 @@ +bundle: + name: test-bundle-$UNIQUE_NAME + +resources: + postgres_synced_tables: + foo: + synced_table_id: lakebase_$UNIQUE_NAME.public.trips_synced + source_table_full_name: main.raw.trips + primary_key_columns: [id] + scheduling_policy: SNAPSHOT + postgres_database: appdb + branch: projects/test-pg-project-$UNIQUE_NAME/branches/production + create_database_objects_if_missing: true + new_pipeline_spec: + storage_catalog: main + storage_schema: pipelines diff --git a/acceptance/bundle/invariant/continue_293/out.test.toml b/acceptance/bundle/invariant/continue_293/out.test.toml index 3bff3e2836..0b5736c7f0 100644 --- a/acceptance/bundle/invariant/continue_293/out.test.toml +++ b/acceptance/bundle/invariant/continue_293/out.test.toml @@ -29,6 +29,7 @@ EnvMatrix.INPUT_CONFIG = [ "postgres_catalog.yml.tmpl", "postgres_endpoint.yml.tmpl", "postgres_project.yml.tmpl", + "postgres_synced_table.yml.tmpl", "registered_model.yml.tmpl", "schema.yml.tmpl", "schema_grant_ref.yml.tmpl", diff --git a/acceptance/bundle/invariant/migrate/out.test.toml b/acceptance/bundle/invariant/migrate/out.test.toml index 3bff3e2836..0b5736c7f0 100644 --- a/acceptance/bundle/invariant/migrate/out.test.toml +++ b/acceptance/bundle/invariant/migrate/out.test.toml @@ -29,6 +29,7 @@ EnvMatrix.INPUT_CONFIG = [ "postgres_catalog.yml.tmpl", "postgres_endpoint.yml.tmpl", "postgres_project.yml.tmpl", + "postgres_synced_table.yml.tmpl", "registered_model.yml.tmpl", "schema.yml.tmpl", "schema_grant_ref.yml.tmpl", diff --git a/acceptance/bundle/invariant/no_drift/out.test.toml b/acceptance/bundle/invariant/no_drift/out.test.toml index 3bff3e2836..0b5736c7f0 100644 --- a/acceptance/bundle/invariant/no_drift/out.test.toml +++ b/acceptance/bundle/invariant/no_drift/out.test.toml @@ -29,6 +29,7 @@ EnvMatrix.INPUT_CONFIG = [ "postgres_catalog.yml.tmpl", "postgres_endpoint.yml.tmpl", "postgres_project.yml.tmpl", + "postgres_synced_table.yml.tmpl", "registered_model.yml.tmpl", "schema.yml.tmpl", "schema_grant_ref.yml.tmpl", diff --git a/acceptance/bundle/invariant/test.toml b/acceptance/bundle/invariant/test.toml index abfb10c5f7..021beef634 100644 --- a/acceptance/bundle/invariant/test.toml +++ b/acceptance/bundle/invariant/test.toml @@ -47,6 +47,7 @@ EnvMatrix.INPUT_CONFIG = [ "postgres_catalog.yml.tmpl", "postgres_endpoint.yml.tmpl", "postgres_project.yml.tmpl", + "postgres_synced_table.yml.tmpl", "registered_model.yml.tmpl", "schema.yml.tmpl", "schema_grant_ref.yml.tmpl", @@ -69,6 +70,7 @@ no_postgres_project_on_cloud = ["CONFIG_Cloud=true", "INPUT_CONFIG=postgres_proj no_postgres_branch_on_cloud = ["CONFIG_Cloud=true", "INPUT_CONFIG=postgres_branch.yml.tmpl"] no_postgres_endpoint_on_cloud = ["CONFIG_Cloud=true", "INPUT_CONFIG=postgres_endpoint.yml.tmpl"] no_postgres_catalog_on_cloud = ["CONFIG_Cloud=true", "INPUT_CONFIG=postgres_catalog.yml.tmpl"] +no_postgres_synced_table_on_cloud = ["CONFIG_Cloud=true", "INPUT_CONFIG=postgres_synced_table.yml.tmpl"] # External locations require actual storage credentials with cloud IAM setup # which are environment-specific, so we only test locally with the mock server diff --git a/acceptance/bundle/refschema/out.fields.txt b/acceptance/bundle/refschema/out.fields.txt index d50035a048..42a2026c5e 100644 --- a/acceptance/bundle/refschema/out.fields.txt +++ b/acceptance/bundle/refschema/out.fields.txt @@ -2850,6 +2850,49 @@ resources.postgres_projects.*.permissions[*].group_name string ALL resources.postgres_projects.*.permissions[*].level iam.PermissionLevel ALL resources.postgres_projects.*.permissions[*].service_principal_name string ALL resources.postgres_projects.*.permissions[*].user_name string ALL +resources.postgres_synced_tables.*.branch string ALL +resources.postgres_synced_tables.*.create_database_objects_if_missing bool ALL +resources.postgres_synced_tables.*.create_time *time.Time REMOTE +resources.postgres_synced_tables.*.existing_pipeline_id string ALL +resources.postgres_synced_tables.*.id string INPUT +resources.postgres_synced_tables.*.lifecycle resources.Lifecycle INPUT +resources.postgres_synced_tables.*.lifecycle.prevent_destroy bool INPUT +resources.postgres_synced_tables.*.modified_status string INPUT +resources.postgres_synced_tables.*.name string REMOTE +resources.postgres_synced_tables.*.new_pipeline_spec *postgres.NewPipelineSpec ALL +resources.postgres_synced_tables.*.new_pipeline_spec.budget_policy_id string ALL +resources.postgres_synced_tables.*.new_pipeline_spec.storage_catalog string ALL +resources.postgres_synced_tables.*.new_pipeline_spec.storage_schema string ALL +resources.postgres_synced_tables.*.postgres_database string ALL +resources.postgres_synced_tables.*.primary_key_columns []string ALL +resources.postgres_synced_tables.*.primary_key_columns[*] string ALL +resources.postgres_synced_tables.*.scheduling_policy postgres.SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy ALL +resources.postgres_synced_tables.*.source_table_full_name string ALL +resources.postgres_synced_tables.*.status *postgres.SyncedTableSyncedTableStatus REMOTE +resources.postgres_synced_tables.*.status.detailed_state postgres.SyncedTableState REMOTE +resources.postgres_synced_tables.*.status.last_processed_commit_version int64 REMOTE +resources.postgres_synced_tables.*.status.last_sync *postgres.SyncedTablePosition REMOTE +resources.postgres_synced_tables.*.status.last_sync.delta_table_sync_info *postgres.DeltaTableSyncInfo REMOTE +resources.postgres_synced_tables.*.status.last_sync.delta_table_sync_info.delta_commit_time *time.Time REMOTE +resources.postgres_synced_tables.*.status.last_sync.delta_table_sync_info.delta_commit_version int64 REMOTE +resources.postgres_synced_tables.*.status.last_sync.sync_end_time *time.Time REMOTE +resources.postgres_synced_tables.*.status.last_sync.sync_start_time *time.Time REMOTE +resources.postgres_synced_tables.*.status.last_sync_time *time.Time REMOTE +resources.postgres_synced_tables.*.status.message string REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress *postgres.SyncedTablePipelineProgress REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress.estimated_completion_time_seconds float64 REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress.latest_version_currently_processing int64 REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress.sync_progress_completion float64 REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress.synced_row_count int64 REMOTE +resources.postgres_synced_tables.*.status.ongoing_sync_progress.total_row_count int64 REMOTE +resources.postgres_synced_tables.*.status.pipeline_id string REMOTE +resources.postgres_synced_tables.*.status.project string REMOTE +resources.postgres_synced_tables.*.status.provisioning_phase postgres.ProvisioningPhase REMOTE +resources.postgres_synced_tables.*.status.unity_catalog_provisioning_state postgres.ProvisioningInfoState REMOTE +resources.postgres_synced_tables.*.synced_table_id string ALL +resources.postgres_synced_tables.*.timeseries_key string ALL +resources.postgres_synced_tables.*.uid string REMOTE +resources.postgres_synced_tables.*.url string INPUT resources.quality_monitors.*.assets_dir string ALL resources.quality_monitors.*.baseline_table_name string ALL resources.quality_monitors.*.custom_metrics []catalog.MonitorMetric ALL diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/databricks.yml.tmpl b/acceptance/bundle/resources/postgres_synced_tables/basic/databricks.yml.tmpl new file mode 100644 index 0000000000..9d14a96d86 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/databricks.yml.tmpl @@ -0,0 +1,38 @@ +bundle: + name: deploy-postgres-synced-table-$UNIQUE_NAME + +sync: + paths: [] + +resources: + schemas: + pipeline_storage: + name: pipeline_storage_$UNIQUE_NAME + catalog_name: main + comment: "Pipeline storage for the synced-table test" + + postgres_projects: + my_project: + project_id: test-pg-proj-$UNIQUE_NAME + display_name: "Test Project for Synced Table" + pg_version: 17 + + postgres_catalogs: + my_catalog: + catalog_id: lakebase_test_$UNIQUE_NAME + branch: ${resources.postgres_projects.my_project.id}/branches/production + postgres_database: appdb + create_database_if_missing: true + + postgres_synced_tables: + my_table: + synced_table_id: ${resources.postgres_catalogs.my_catalog.catalog_id}.public.trips_synced + source_table_full_name: main.source_$UNIQUE_NAME.trips_source + primary_key_columns: ["tpep_pickup_datetime"] + scheduling_policy: SNAPSHOT + postgres_database: appdb + branch: ${resources.postgres_projects.my_project.id}/branches/production + create_database_objects_if_missing: true + new_pipeline_spec: + storage_catalog: ${resources.schemas.pipeline_storage.catalog_name} + storage_schema: ${resources.schemas.pipeline_storage.name} diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.direct.json b/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.direct.json new file mode 100644 index 0000000000..4e85c53cd9 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.direct.json @@ -0,0 +1,54 @@ +{ + "method": "POST", + "path": "/api/2.0/postgres/projects", + "q": { + "project_id": "test-pg-proj-[UNIQUE_NAME]" + }, + "body": { + "spec": { + "display_name": "Test Project for Synced Table", + "pg_version": 17 + } + } +} +{ + "method": "POST", + "path": "/api/2.0/postgres/catalogs", + "q": { + "catalog_id": "lakebase_test_[UNIQUE_NAME]" + }, + "body": { + "spec": { + "branch": "projects/test-pg-proj-[UNIQUE_NAME]/branches/production", + "create_database_if_missing": true, + "postgres_database": "appdb" + } + } +} +{ + "method": "POST", + "path": "/api/2.0/postgres/synced_tables", + "q": { + "synced_table_id": "lakebase_test_[UNIQUE_NAME].public.trips_synced" + }, + "body": { + "spec": { + "branch": "projects/test-pg-proj-[UNIQUE_NAME]/branches/production", + "create_database_objects_if_missing": true, + "new_pipeline_spec": { + "storage_catalog": "main", + "storage_schema": "pipeline_storage_[UNIQUE_NAME]" + }, + "postgres_database": "appdb", + "primary_key_columns": [ + "tpep_pickup_datetime" + ], + "scheduling_policy": "SNAPSHOT", + "source_table_full_name": "main.source_[UNIQUE_NAME].trips_source" + } + } +} +{ + "method": "GET", + "path": "/api/2.0/postgres/synced_tables/lakebase_test_[UNIQUE_NAME].public.trips_synced" +} diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.terraform.json b/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.terraform.json new file mode 100644 index 0000000000..4e85c53cd9 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/out.requests.terraform.json @@ -0,0 +1,54 @@ +{ + "method": "POST", + "path": "/api/2.0/postgres/projects", + "q": { + "project_id": "test-pg-proj-[UNIQUE_NAME]" + }, + "body": { + "spec": { + "display_name": "Test Project for Synced Table", + "pg_version": 17 + } + } +} +{ + "method": "POST", + "path": "/api/2.0/postgres/catalogs", + "q": { + "catalog_id": "lakebase_test_[UNIQUE_NAME]" + }, + "body": { + "spec": { + "branch": "projects/test-pg-proj-[UNIQUE_NAME]/branches/production", + "create_database_if_missing": true, + "postgres_database": "appdb" + } + } +} +{ + "method": "POST", + "path": "/api/2.0/postgres/synced_tables", + "q": { + "synced_table_id": "lakebase_test_[UNIQUE_NAME].public.trips_synced" + }, + "body": { + "spec": { + "branch": "projects/test-pg-proj-[UNIQUE_NAME]/branches/production", + "create_database_objects_if_missing": true, + "new_pipeline_spec": { + "storage_catalog": "main", + "storage_schema": "pipeline_storage_[UNIQUE_NAME]" + }, + "postgres_database": "appdb", + "primary_key_columns": [ + "tpep_pickup_datetime" + ], + "scheduling_policy": "SNAPSHOT", + "source_table_full_name": "main.source_[UNIQUE_NAME].trips_source" + } + } +} +{ + "method": "GET", + "path": "/api/2.0/postgres/synced_tables/lakebase_test_[UNIQUE_NAME].public.trips_synced" +} diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/out.test.toml b/acceptance/bundle/resources/postgres_synced_tables/basic/out.test.toml new file mode 100644 index 0000000000..110f841fa0 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = true +RequiresUnityCatalog = true +CloudEnvs.azure = false +CloudEnvs.gcp = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct", "terraform"] diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/output.txt b/acceptance/bundle/resources/postgres_synced_tables/basic/output.txt new file mode 100644 index 0000000000..0fb427cccc --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/output.txt @@ -0,0 +1,95 @@ +Creating temporary source table: main.source_[UNIQUE_NAME].trips_source +{ + "full_name": "main.source_[UNIQUE_NAME]" +} + +>>> [CLI] bundle validate +Name: deploy-postgres-synced-table-[UNIQUE_NAME] +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/deploy-postgres-synced-table-[UNIQUE_NAME]/default + +Validation OK! + +>>> [CLI] bundle summary +Name: deploy-postgres-synced-table-[UNIQUE_NAME] +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/deploy-postgres-synced-table-[UNIQUE_NAME]/default +Resources: + Postgres catalogs: + my_catalog: + Name: lakebase_test_[UNIQUE_NAME] + URL: [DATABRICKS_URL]/explore/data/lakebase_test_[UNIQUE_NAME] + Postgres projects: + my_project: + Name: Test Project for Synced Table + URL: (not deployed) + Postgres synced tables: + my_table: + Name: ${resources.postgres_catalogs.my_catalog.catalog_id}.public.trips_synced + URL: [DATABRICKS_URL]/explore/data/$%7Bresources/postgres_catalogs/my_catalog.catalog_id%7D.public.trips_synced + Schemas: + pipeline_storage: + Name: pipeline_storage_[UNIQUE_NAME] + URL: (not deployed) + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/deploy-postgres-synced-table-[UNIQUE_NAME]/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] postgres get-synced-table synced_tables/lakebase_test_[UNIQUE_NAME].public.trips_synced +{ + "name": "synced_tables/lakebase_test_[UNIQUE_NAME].public.trips_synced", + "unity_catalog_provisioning_state": "ACTIVE" +} + +>>> [CLI] bundle summary +Name: deploy-postgres-synced-table-[UNIQUE_NAME] +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/deploy-postgres-synced-table-[UNIQUE_NAME]/default +Resources: + Postgres catalogs: + my_catalog: + Name: lakebase_test_[UNIQUE_NAME] + URL: [DATABRICKS_URL]/explore/data/lakebase_test_[UNIQUE_NAME] + Postgres projects: + my_project: + Name: Test Project for Synced Table + URL: (not deployed) + Postgres synced tables: + my_table: + Name: ${resources.postgres_catalogs.my_catalog.catalog_id}.public.trips_synced + URL: [DATABRICKS_URL]/explore/data/$%7Bresources/postgres_catalogs/my_catalog.catalog_id%7D.public.trips_synced + Schemas: + pipeline_storage: + Name: pipeline_storage_[UNIQUE_NAME] + URL: [DATABRICKS_URL]/explore/data/main/pipeline_storage_[UNIQUE_NAME] + +>>> print_requests.py --keep --get //postgres ^//workspace-files/ ^//workspace/ ^//telemetry-ext ^//operations/ + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.postgres_catalogs.my_catalog + delete resources.postgres_projects.my_project + delete resources.postgres_synced_tables.my_table + delete resources.schemas.pipeline_storage + +This action will result in the deletion of the following UC schemas. Any underlying data may be lost: + delete resources.schemas.pipeline_storage + +This action will result in the deletion of the following Lakebase projects along with +all their branches, databases, and endpoints. All data stored in them will be permanently lost: + delete resources.postgres_projects.my_project + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/deploy-postgres-synced-table-[UNIQUE_NAME]/default + +Deleting files... +Destroy complete! +Cleaning up temporary source table diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/script b/acceptance/bundle/resources/postgres_synced_tables/basic/script new file mode 100644 index 0000000000..c487aab02b --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/script @@ -0,0 +1,39 @@ +envsubst < databricks.yml.tmpl > databricks.yml + +# Create a per-test source schema + table. We can't read samples.nyctaxi.trips +# directly because shared workspaces hit the "20 synced tables per source table" +# limit (same reason synced_database_tables takes this approach). +echo "Creating temporary source table: main.source_$UNIQUE_NAME.trips_source" +$CLI schemas create source_$UNIQUE_NAME main -o json | jq '{full_name}' +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/sql/statements/" --json "{ + \"warehouse_id\": \"$TEST_DEFAULT_WAREHOUSE_ID\", + \"statement\": \"CREATE TABLE main.source_$UNIQUE_NAME.trips_source AS SELECT * FROM samples.nyctaxi.trips LIMIT 10\", + \"wait_timeout\": \"45s\" + }" > /dev/null + +cleanup() { + trace $CLI bundle destroy --auto-approve + echo "Cleaning up temporary source table" + $CLI tables delete main.source_$UNIQUE_NAME.trips_source || true + $CLI schemas delete main.source_$UNIQUE_NAME || true + rm -f out.requests.txt +} +trap cleanup EXIT + +trace $CLI bundle validate + +trace $CLI bundle summary + +rm -f out.requests.txt +trace $CLI bundle deploy + +# Keep only the deterministic identity + provisioning-state. detailed_state +# varies with pipeline-provisioning timing on cloud, ongoing_sync_progress and +# project only appear there, and create_time/update_time/uid/pipeline_id are +# random per run. +trace $CLI postgres get-synced-table "synced_tables/lakebase_test_${UNIQUE_NAME}.public.trips_synced" | jq '{name, unity_catalog_provisioning_state: .status.unity_catalog_provisioning_state}' + +trace $CLI bundle summary + +# Filter requests to only show postgres operations (exclude workspace, telemetry, and operation polling). +trace print_requests.py --keep --get '//postgres' '^//workspace-files/' '^//workspace/' '^//telemetry-ext' '^//operations/' > out.requests.$DATABRICKS_BUNDLE_ENGINE.json diff --git a/acceptance/bundle/resources/postgres_synced_tables/basic/test.toml b/acceptance/bundle/resources/postgres_synced_tables/basic/test.toml new file mode 100644 index 0000000000..f8b3bbe49d --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/basic/test.toml @@ -0,0 +1 @@ +# All configuration inherited from parent test.toml diff --git a/acceptance/bundle/resources/postgres_synced_tables/recreate/databricks.yml.tmpl b/acceptance/bundle/resources/postgres_synced_tables/recreate/databricks.yml.tmpl new file mode 100644 index 0000000000..40a350d53f --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/recreate/databricks.yml.tmpl @@ -0,0 +1,39 @@ +bundle: + name: recreate-postgres-synced-table-$UNIQUE_NAME + +sync: + paths: [] + +resources: + schemas: + pipeline_storage: + name: pipeline_storage_$UNIQUE_NAME + catalog_name: main + comment: "Pipeline storage for the synced-table recreate test" + + postgres_projects: + my_project: + project_id: test-pg-proj-$UNIQUE_NAME + display_name: "Test Project for Synced Table Recreate" + pg_version: 17 + + postgres_catalogs: + my_catalog: + catalog_id: lakebase_test_$UNIQUE_NAME + branch: ${resources.postgres_projects.my_project.id}/branches/production + postgres_database: appdb + create_database_if_missing: true + + postgres_synced_tables: + my_table: + synced_table_id: ${resources.postgres_catalogs.my_catalog.catalog_id}.public.trips_synced + source_table_full_name: main.source_$UNIQUE_NAME.trips_source + primary_key_columns: ["tpep_pickup_datetime"] + scheduling_policy: SNAPSHOT + postgres_database: appdb + branch: ${resources.postgres_projects.my_project.id}/branches/production + create_database_objects_if_missing: true + timeseries_key: $TIMESERIES_KEY + new_pipeline_spec: + storage_catalog: ${resources.schemas.pipeline_storage.catalog_name} + storage_schema: ${resources.schemas.pipeline_storage.name} diff --git a/acceptance/bundle/resources/postgres_synced_tables/recreate/out.test.toml b/acceptance/bundle/resources/postgres_synced_tables/recreate/out.test.toml new file mode 100644 index 0000000000..110f841fa0 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/recreate/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = true +RequiresUnityCatalog = true +CloudEnvs.azure = false +CloudEnvs.gcp = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct", "terraform"] diff --git a/acceptance/bundle/resources/postgres_synced_tables/recreate/output.txt b/acceptance/bundle/resources/postgres_synced_tables/recreate/output.txt new file mode 100644 index 0000000000..7e2924062d --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/recreate/output.txt @@ -0,0 +1,41 @@ +Creating temporary source table: main.source_[UNIQUE_NAME].trips_source +{ + "full_name": "main.source_[UNIQUE_NAME]" +} + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/recreate-postgres-synced-table-[UNIQUE_NAME]/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle plan +recreate postgres_synced_tables.my_table + +Plan: 1 to add, 0 to change, 1 to delete, 3 unchanged + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/recreate-postgres-synced-table-[UNIQUE_NAME]/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.postgres_catalogs.my_catalog + delete resources.postgres_projects.my_project + delete resources.postgres_synced_tables.my_table + delete resources.schemas.pipeline_storage + +This action will result in the deletion of the following UC schemas. Any underlying data may be lost: + delete resources.schemas.pipeline_storage + +This action will result in the deletion of the following Lakebase projects along with +all their branches, databases, and endpoints. All data stored in them will be permanently lost: + delete resources.postgres_projects.my_project + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/recreate-postgres-synced-table-[UNIQUE_NAME]/default + +Deleting files... +Destroy complete! +Cleaning up temporary source table diff --git a/acceptance/bundle/resources/postgres_synced_tables/recreate/script b/acceptance/bundle/resources/postgres_synced_tables/recreate/script new file mode 100644 index 0000000000..de437c0c15 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/recreate/script @@ -0,0 +1,30 @@ +# Per-test source table to avoid the 20-synced-tables-per-source-table limit. +echo "Creating temporary source table: main.source_$UNIQUE_NAME.trips_source" +$CLI schemas create source_$UNIQUE_NAME main -o json | jq '{full_name}' +MSYS_NO_PATHCONV=1 $CLI api post "/api/2.0/sql/statements/" --json "{ + \"warehouse_id\": \"$TEST_DEFAULT_WAREHOUSE_ID\", + \"statement\": \"CREATE TABLE main.source_$UNIQUE_NAME.trips_source AS SELECT * FROM samples.nyctaxi.trips LIMIT 10\", + \"wait_timeout\": \"45s\" + }" > /dev/null + +cleanup() { + trace $CLI bundle destroy --auto-approve + echo "Cleaning up temporary source table" + $CLI tables delete main.source_$UNIQUE_NAME.trips_source || true + $CLI schemas delete main.source_$UNIQUE_NAME || true + rm -f out.requests.txt +} +trap cleanup EXIT + +export TIMESERIES_KEY=tpep_pickup_datetime +envsubst < databricks.yml.tmpl > databricks.yml +trace $CLI bundle deploy + +# Toggle a recreate-on-change field; plan must show delete + create. +# We toggle timeseries_key (not scheduling_policy) so we don't need CDF +# on the source. +export TIMESERIES_KEY=tpep_dropoff_datetime +envsubst < databricks.yml.tmpl > databricks.yml +trace $CLI bundle plan | contains.py "Plan: 1 to add, 0 to change, 1 to delete, 3 unchanged" + +trace $CLI bundle deploy diff --git a/acceptance/bundle/resources/postgres_synced_tables/recreate/test.toml b/acceptance/bundle/resources/postgres_synced_tables/recreate/test.toml new file mode 100644 index 0000000000..f8b3bbe49d --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/recreate/test.toml @@ -0,0 +1 @@ +# All configuration inherited from parent test.toml diff --git a/acceptance/bundle/resources/postgres_synced_tables/test.toml b/acceptance/bundle/resources/postgres_synced_tables/test.toml new file mode 100644 index 0000000000..394df07ae0 --- /dev/null +++ b/acceptance/bundle/resources/postgres_synced_tables/test.toml @@ -0,0 +1,35 @@ +Local = true +Cloud = true +RequiresUnityCatalog = true + +# Lakebase v2 (postgres) is only available in AWS as of January 2026 +CloudEnvs.gcp = false +CloudEnvs.azure = false + +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct", "terraform"] + +Ignore = [ + "databricks.yml", + ".databricks", +] + +[[Repls]] +# Clean up ?o= suffix after URL since not all workspaces have that +Old = '\?o=\[(NUMID|ALPHANUMID)\]' +New = '' +Order = 1000 + +[[Repls]] +# Normalize postgres operation IDs (unique per operation). +Old = '/operations/[A-Za-z0-9+/=-]+' +New = '/operations/[OPERATION_ID]' +Order = 2000 + +# Fake SQL endpoint for the per-test source table create in script. +[[Server]] +Pattern = "POST /api/2.0/sql/statements/" +Response.Body = '{"status": {"state": "SUCCEEDED"}, "manifest": {"schema": {"columns": []}}}' + +[[Server]] +Pattern = "DELETE /api/2.1/unity-catalog/tables/{full_name}" +Response.Body = '{"status": "OK"}' diff --git a/bundle/config/mutator/resourcemutator/apply_bundle_permissions_test.go b/bundle/config/mutator/resourcemutator/apply_bundle_permissions_test.go index d78c1ad814..d3fbbbc2cf 100644 --- a/bundle/config/mutator/resourcemutator/apply_bundle_permissions_test.go +++ b/bundle/config/mutator/resourcemutator/apply_bundle_permissions_test.go @@ -29,6 +29,7 @@ var unsupportedResources = []string{ "postgres_branches", "postgres_endpoints", "postgres_catalogs", + "postgres_synced_tables", } func TestApplyBundlePermissions(t *testing.T) { diff --git a/bundle/config/mutator/resourcemutator/apply_target_mode_test.go b/bundle/config/mutator/resourcemutator/apply_target_mode_test.go index 927c7a1913..ab6a80f5e7 100644 --- a/bundle/config/mutator/resourcemutator/apply_target_mode_test.go +++ b/bundle/config/mutator/resourcemutator/apply_target_mode_test.go @@ -254,6 +254,13 @@ func mockBundle(mode config.Mode) *bundle.Bundle { }, }, }, + PostgresSyncedTables: map[string]*resources.PostgresSyncedTable{ + "postgres_synced_table1": { + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "catalog.schema.table1", + }, + }, + }, VectorSearchEndpoints: map[string]*resources.VectorSearchEndpoint{ "vs_endpoint1": { CreateEndpoint: vectorsearch.CreateEndpoint{ @@ -449,6 +456,7 @@ func TestAppropriateResourcesAreRenamed(t *testing.T) { "PostgresBranches", "PostgresEndpoints", "PostgresCatalogs", + "PostgresSyncedTables", } diags := bundle.ApplySeq(t.Context(), b, ApplyTargetMode(), ApplyPresets()) diff --git a/bundle/config/mutator/resourcemutator/run_as_test.go b/bundle/config/mutator/resourcemutator/run_as_test.go index 58b27113a5..e80edd6711 100644 --- a/bundle/config/mutator/resourcemutator/run_as_test.go +++ b/bundle/config/mutator/resourcemutator/run_as_test.go @@ -49,6 +49,7 @@ func allResourceTypes(t *testing.T) []string { "postgres_catalogs", "postgres_endpoints", "postgres_projects", + "postgres_synced_tables", "quality_monitors", "registered_models", "schemas", @@ -178,6 +179,7 @@ var allowList = []string{ "postgres_catalogs", "postgres_endpoints", "postgres_projects", + "postgres_synced_tables", "registered_models", "experiments", "schemas", diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 30a7f4e95e..636186ddb7 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -36,6 +36,7 @@ type Resources struct { PostgresBranches map[string]*resources.PostgresBranch `json:"postgres_branches,omitempty"` PostgresEndpoints map[string]*resources.PostgresEndpoint `json:"postgres_endpoints,omitempty"` PostgresCatalogs map[string]*resources.PostgresCatalog `json:"postgres_catalogs,omitempty"` + PostgresSyncedTables map[string]*resources.PostgresSyncedTable `json:"postgres_synced_tables,omitempty"` VectorSearchEndpoints map[string]*resources.VectorSearchEndpoint `json:"vector_search_endpoints,omitempty"` } @@ -114,6 +115,7 @@ func (r *Resources) AllResources() []ResourceGroup { collectResourceMap(descriptions["postgres_branches"], r.PostgresBranches), collectResourceMap(descriptions["postgres_endpoints"], r.PostgresEndpoints), collectResourceMap(descriptions["postgres_catalogs"], r.PostgresCatalogs), + collectResourceMap(descriptions["postgres_synced_tables"], r.PostgresSyncedTables), collectResourceMap(descriptions["vector_search_endpoints"], r.VectorSearchEndpoints), } } @@ -170,6 +172,7 @@ func SupportedResources() map[string]resources.ResourceDescription { "postgres_branches": (&resources.PostgresBranch{}).ResourceDescription(), "postgres_endpoints": (&resources.PostgresEndpoint{}).ResourceDescription(), "postgres_catalogs": (&resources.PostgresCatalog{}).ResourceDescription(), + "postgres_synced_tables": (&resources.PostgresSyncedTable{}).ResourceDescription(), "vector_search_endpoints": (&resources.VectorSearchEndpoint{}).ResourceDescription(), } } diff --git a/bundle/config/resources/postgres_synced_table.go b/bundle/config/resources/postgres_synced_table.go new file mode 100644 index 0000000000..8c01058ea1 --- /dev/null +++ b/bundle/config/resources/postgres_synced_table.go @@ -0,0 +1,79 @@ +package resources + +import ( + "context" + "net/url" + "strings" + + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/marshal" + "github.com/databricks/databricks-sdk-go/service/postgres" +) + +type PostgresSyncedTableConfig struct { + postgres.SyncedTableSyncedTableSpec + + // SyncedTableId is the user-specified three-part UC name (catalog.schema.table). + // Becomes the trailing component of the server-assigned Name: + // "synced_tables/{synced_table_id}". + SyncedTableId string `json:"synced_table_id"` +} + +func (c *PostgresSyncedTableConfig) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, c) +} + +func (c *PostgresSyncedTableConfig) MarshalJSON() ([]byte, error) { + return marshal.Marshal(c) +} + +type PostgresSyncedTable struct { + BaseResource + PostgresSyncedTableConfig +} + +func (s *PostgresSyncedTable) Exists(ctx context.Context, w *databricks.WorkspaceClient, name string) (bool, error) { + _, err := w.Postgres.GetSyncedTable(ctx, postgres.GetSyncedTableRequest{Name: name}) + if err != nil { + log.Debugf(ctx, "postgres synced table %s does not exist", name) + return false, err + } + return true, nil +} + +func (s *PostgresSyncedTable) ResourceDescription() ResourceDescription { + return ResourceDescription{ + SingularName: "postgres_synced_table", + PluralName: "postgres_synced_tables", + SingularTitle: "Postgres synced table", + PluralTitle: "Postgres synced tables", + } +} + +func (s *PostgresSyncedTable) GetName() string { + // Synced tables don't expose a display name distinct from their three-part id. + return s.SyncedTableId +} + +func (s *PostgresSyncedTable) GetURL() string { + return s.URL +} + +func (s *PostgresSyncedTable) InitializeURL(baseURL url.URL) { + if s.SyncedTableId == "" { + return + } + // SyncedTableId is a three-part UC name (catalog.schema.table). UC explore + // expects the segments as path components, not a single dotted segment. + catalog, rest, ok := strings.Cut(s.SyncedTableId, ".") + if !ok { + return + } + schema, name, ok := strings.Cut(rest, ".") + if !ok { + return + } + baseURL.Path = "explore/data/" + catalog + "/" + schema + "/" + name + s.URL = baseURL.String() +} diff --git a/bundle/config/resources_test.go b/bundle/config/resources_test.go index 626fac0798..1be4a84a1f 100644 --- a/bundle/config/resources_test.go +++ b/bundle/config/resources_test.go @@ -280,6 +280,13 @@ func TestResourcesBindSupport(t *testing.T) { }, }, }, + PostgresSyncedTables: map[string]*resources.PostgresSyncedTable{ + "my_postgres_synced_table": { + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "catalog.schema.my_postgres_synced_table", + }, + }, + }, VectorSearchEndpoints: map[string]*resources.VectorSearchEndpoint{ "my_vector_search_endpoint": { CreateEndpoint: vectorsearch.CreateEndpoint{ @@ -320,6 +327,7 @@ func TestResourcesBindSupport(t *testing.T) { m.GetMockPostgresAPI().EXPECT().GetBranch(mock.Anything, mock.Anything).Return(nil, nil) m.GetMockPostgresAPI().EXPECT().GetEndpoint(mock.Anything, mock.Anything).Return(nil, nil) m.GetMockPostgresAPI().EXPECT().GetCatalog(mock.Anything, mock.Anything).Return(nil, nil) + m.GetMockPostgresAPI().EXPECT().GetSyncedTable(mock.Anything, mock.Anything).Return(nil, nil) m.GetMockVectorSearchEndpointsAPI().EXPECT().GetEndpoint(mock.Anything, mock.Anything).Return(nil, nil) allResources := supportedResources.AllResources() diff --git a/bundle/deploy/terraform/interpolate.go b/bundle/deploy/terraform/interpolate.go index 7a1acb3d4e..92df9e61cc 100644 --- a/bundle/deploy/terraform/interpolate.go +++ b/bundle/deploy/terraform/interpolate.go @@ -16,7 +16,7 @@ type interpolateMutator struct{} // Postgres resources use "name" instead of "id" as their identifier attribute. func isPostgresResource(resourceType string) bool { switch resourceType { - case "postgres_projects", "postgres_branches", "postgres_endpoints", "postgres_catalogs": + case "postgres_projects", "postgres_branches", "postgres_endpoints", "postgres_catalogs", "postgres_synced_tables": return true default: return false diff --git a/bundle/deploy/terraform/pkg.go b/bundle/deploy/terraform/pkg.go index ff68c1b151..d8dd56c04e 100644 --- a/bundle/deploy/terraform/pkg.go +++ b/bundle/deploy/terraform/pkg.go @@ -130,6 +130,7 @@ var GroupToTerraformName = map[string]string{ "postgres_branches": "databricks_postgres_branch", "postgres_endpoints": "databricks_postgres_endpoint", "postgres_catalogs": "databricks_postgres_catalog", + "postgres_synced_tables": "databricks_postgres_synced_table", // 3 level groups: resources.*.GROUP "permissions": "databricks_permissions", diff --git a/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table.go b/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table.go new file mode 100644 index 0000000000..2f77729429 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table.go @@ -0,0 +1,56 @@ +package tfdyn + +import ( + "context" + + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +type postgresSyncedTableConverter struct{} + +func (c postgresSyncedTableConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error { + // The bundle config has flattened SyncedTableSyncedTableSpec fields at the top level. + // Terraform expects them nested in a "spec" block. + specFields := specFieldNames(schema.ResourcePostgresSyncedTableSpec{}) + topLevelFields := []string{"synced_table_id"} + + specMap := make(map[string]dyn.Value) + for _, field := range specFields { + if v := vin.Get(field); v.Kind() != dyn.KindInvalid { + specMap[field] = v + } + } + + outMap := make(map[string]dyn.Value) + for _, field := range topLevelFields { + if v := vin.Get(field); v.Kind() != dyn.KindInvalid { + outMap[field] = v + } + } + if len(specMap) > 0 { + outMap["spec"] = dyn.V(specMap) + } + + vout := dyn.V(outMap) + + vout, diags := convert.Normalize(schema.ResourcePostgresSyncedTable{}, vout) + for _, diag := range diags { + log.Debugf(ctx, "postgres synced table normalization diagnostic: %s", diag.Summary) + } + + vout, err := convertLifecycle(ctx, vout, vin.Get("lifecycle")) + if err != nil { + return err + } + + out.PostgresSyncedTable[key] = vout.AsAny() + + return nil +} + +func init() { + registerConverter("postgres_synced_tables", postgresSyncedTableConverter{}) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table_test.go b/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table_test.go new file mode 100644 index 0000000000..84e854e6e0 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_postgres_synced_table_test.go @@ -0,0 +1,95 @@ +package tfdyn + +import ( + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/databricks-sdk-go/service/postgres" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertPostgresSyncedTable(t *testing.T) { + src := resources.PostgresSyncedTable{ + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "shop_lakebase.public.orders_synced", + SyncedTableSyncedTableSpec: postgres.SyncedTableSyncedTableSpec{ + Branch: "projects/my-shop/branches/production", + PostgresDatabase: "appdb", + SourceTableFullName: "main.raw.orders", + PrimaryKeyColumns: []string{"order_id"}, + TimeseriesKey: "updated_at", + SchedulingPolicy: postgres.SyncedTableSyncedTableSpecSyncedTableSchedulingPolicySnapshot, + CreateDatabaseObjectsIfMissing: true, + NewPipelineSpec: &postgres.NewPipelineSpec{ + StorageCatalog: "main", + StorageSchema: "pipelines", + }, + }, + }, + } + + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + + ctx := t.Context() + out := schema.NewResources() + err = postgresSyncedTableConverter{}.Convert(ctx, "my_postgres_synced_table", vin, out) + require.NoError(t, err) + + postgresSyncedTable := out.PostgresSyncedTable["my_postgres_synced_table"] + assert.Equal(t, map[string]any{ + "synced_table_id": "shop_lakebase.public.orders_synced", + "spec": map[string]any{ + "branch": "projects/my-shop/branches/production", + "postgres_database": "appdb", + "source_table_full_name": "main.raw.orders", + "primary_key_columns": []any{"order_id"}, + "timeseries_key": "updated_at", + "scheduling_policy": "SNAPSHOT", + "create_database_objects_if_missing": true, + "new_pipeline_spec": map[string]any{ + "storage_catalog": "main", + "storage_schema": "pipelines", + }, + }, + }, postgresSyncedTable) +} + +func TestConvertPostgresSyncedTableMinimal(t *testing.T) { + src := resources.PostgresSyncedTable{ + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "shop_lakebase.public.orders_synced", + SyncedTableSyncedTableSpec: postgres.SyncedTableSyncedTableSpec{ + Branch: "projects/my-shop/branches/production", + PostgresDatabase: "appdb", + SourceTableFullName: "main.raw.orders", + PrimaryKeyColumns: []string{"order_id"}, + SchedulingPolicy: postgres.SyncedTableSyncedTableSpecSyncedTableSchedulingPolicySnapshot, + }, + }, + } + + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + + ctx := t.Context() + out := schema.NewResources() + err = postgresSyncedTableConverter{}.Convert(ctx, "minimal_postgres_synced_table", vin, out) + require.NoError(t, err) + + postgresSyncedTable := out.PostgresSyncedTable["minimal_postgres_synced_table"] + assert.Equal(t, map[string]any{ + "synced_table_id": "shop_lakebase.public.orders_synced", + "spec": map[string]any{ + "branch": "projects/my-shop/branches/production", + "postgres_database": "appdb", + "source_table_full_name": "main.raw.orders", + "primary_key_columns": []any{"order_id"}, + "scheduling_policy": "SNAPSHOT", + }, + }, postgresSyncedTable) +} diff --git a/bundle/deploy/terraform/util.go b/bundle/deploy/terraform/util.go index 69c3c4f886..7ca5e9a1d1 100644 --- a/bundle/deploy/terraform/util.go +++ b/bundle/deploy/terraform/util.go @@ -96,7 +96,7 @@ func parseResourcesState(ctx context.Context, path string) (ExportedResourcesMap // The direct engine manages permissions as a sub-resource // (SecretScopeFixups adds MANAGE ACL for the current user). result[resourceKey+".permissions"] = ResourceState{ID: instance.Attributes.Name} - case "apps", "database_instances", "database_catalogs", "synced_database_tables", "postgres_projects", "postgres_branches", "postgres_endpoints", "postgres_catalogs": + case "apps", "database_instances", "database_catalogs", "synced_database_tables", "postgres_projects", "postgres_branches", "postgres_endpoints", "postgres_catalogs", "postgres_synced_tables": resourceKey = "resources." + groupName + "." + resource.Name resourceState = ResourceState{ID: instance.Attributes.Name} case "dashboards": diff --git a/bundle/direct/dresources/all.go b/bundle/direct/dresources/all.go index 497bf9bcb7..799e5ae14d 100644 --- a/bundle/direct/dresources/all.go +++ b/bundle/direct/dresources/all.go @@ -24,6 +24,7 @@ var SupportedResources = map[string]any{ "postgres_branches": (*ResourcePostgresBranch)(nil), "postgres_endpoints": (*ResourcePostgresEndpoint)(nil), "postgres_catalogs": (*ResourcePostgresCatalog)(nil), + "postgres_synced_tables": (*ResourcePostgresSyncedTable)(nil), "alerts": (*ResourceAlert)(nil), "clusters": (*ResourceCluster)(nil), "registered_models": (*ResourceRegisteredModel)(nil), diff --git a/bundle/direct/dresources/all_test.go b/bundle/direct/dresources/all_test.go index f130afc619..6e4a257e32 100644 --- a/bundle/direct/dresources/all_test.go +++ b/bundle/direct/dresources/all_test.go @@ -197,6 +197,12 @@ var testConfig map[string]any = map[string]any{ }, }, + "postgres_synced_tables": &resources.PostgresSyncedTable{ + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "main.public.trips_synced", + }, + }, + "alerts": &resources.Alert{ AlertV2: sql.AlertV2{ DisplayName: "my-alert", diff --git a/bundle/direct/dresources/apitypes.generated.yml b/bundle/direct/dresources/apitypes.generated.yml index 6478f029fe..767ad08049 100644 --- a/bundle/direct/dresources/apitypes.generated.yml +++ b/bundle/direct/dresources/apitypes.generated.yml @@ -34,6 +34,8 @@ postgres_endpoints: postgres.EndpointSpec postgres_projects: postgres.ProjectStatus +postgres_synced_tables: postgres.SyncedTableSyncedTableSpec + quality_monitors: catalog.CreateMonitor registered_models: catalog.RegisteredModelInfo diff --git a/bundle/direct/dresources/apitypes.yml b/bundle/direct/dresources/apitypes.yml index c37dfbccbb..7d478be47f 100644 --- a/bundle/direct/dresources/apitypes.yml +++ b/bundle/direct/dresources/apitypes.yml @@ -11,3 +11,5 @@ postgres_endpoints: postgres.EndpointSpec postgres_projects: postgres.ProjectSpec postgres_catalogs: postgres.CatalogCatalogSpec + +postgres_synced_tables: postgres.SyncedTableSyncedTableSpec diff --git a/bundle/direct/dresources/postgres_synced_table.go b/bundle/direct/dresources/postgres_synced_table.go new file mode 100644 index 0000000000..cd4c4cc78d --- /dev/null +++ b/bundle/direct/dresources/postgres_synced_table.go @@ -0,0 +1,129 @@ +package dresources + +import ( + "context" + "strings" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go" + sdktime "github.com/databricks/databricks-sdk-go/common/types/time" + "github.com/databricks/databricks-sdk-go/marshal" + "github.com/databricks/databricks-sdk-go/service/postgres" +) + +// PostgresSyncedTableRemote is the return type for DoRead. It embeds +// SyncedTableSyncedTableSpec so that all paths in StateType are valid paths in +// RemoteType, enabling drift detection for spec fields once the backend echoes +// spec on GET. +type PostgresSyncedTableRemote struct { + postgres.SyncedTableSyncedTableSpec + + SyncedTableId string `json:"synced_table_id,omitempty"` + + Name string `json:"name,omitempty"` + Status *postgres.SyncedTableSyncedTableStatus `json:"status,omitempty"` + Uid string `json:"uid,omitempty"` + CreateTime *sdktime.Time `json:"create_time,omitempty"` +} + +// Custom marshaler needed because embedded SyncedTableSyncedTableSpec has its own +// MarshalJSON which would otherwise take over and ignore the additional fields. +func (s *PostgresSyncedTableRemote) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, s) +} + +func (s PostgresSyncedTableRemote) MarshalJSON() ([]byte, error) { + return marshal.Marshal(s) +} + +type ResourcePostgresSyncedTable struct { + client *databricks.WorkspaceClient +} + +type PostgresSyncedTableState = resources.PostgresSyncedTableConfig + +func (*ResourcePostgresSyncedTable) New(client *databricks.WorkspaceClient) *ResourcePostgresSyncedTable { + return &ResourcePostgresSyncedTable{client: client} +} + +func (*ResourcePostgresSyncedTable) PrepareState(input *resources.PostgresSyncedTable) *PostgresSyncedTableState { + return &PostgresSyncedTableState{ + SyncedTableId: input.SyncedTableId, + SyncedTableSyncedTableSpec: input.SyncedTableSyncedTableSpec, + } +} + +func (*ResourcePostgresSyncedTable) RemapState(remote *PostgresSyncedTableRemote) *PostgresSyncedTableState { + return &PostgresSyncedTableState{ + SyncedTableId: remote.SyncedTableId, + SyncedTableSyncedTableSpec: remote.SyncedTableSyncedTableSpec, + } +} + +// makePostgresSyncedTableRemote converts the SDK SyncedTable into the embedded +// remote shape. GET does not echo spec today (only status is returned); the +// embedded spec fields stay at their zero values, and resources.yml suppresses +// phantom drift via ignore_remote_changes with reason spec:input_only. +// +// Unlike postgres_catalogs (which has Status.CatalogId), the synced-table API +// doesn't expose the user-facing id as a named field. It only appears as the +// trailing component of remote.Name, so we strip the constant "synced_tables/" +// prefix. +func makePostgresSyncedTableRemote(syncedTable *postgres.SyncedTable) *PostgresSyncedTableRemote { + var spec postgres.SyncedTableSyncedTableSpec + if syncedTable.Spec != nil { + spec = *syncedTable.Spec + } + return &PostgresSyncedTableRemote{ + SyncedTableSyncedTableSpec: spec, + SyncedTableId: strings.TrimPrefix(syncedTable.Name, "synced_tables/"), + Name: syncedTable.Name, + Status: syncedTable.Status, + Uid: syncedTable.Uid, + CreateTime: syncedTable.CreateTime, + } +} + +func (r *ResourcePostgresSyncedTable) DoRead(ctx context.Context, id string) (*PostgresSyncedTableRemote, error) { + syncedTable, err := r.client.Postgres.GetSyncedTable(ctx, postgres.GetSyncedTableRequest{Name: id}) + if err != nil { + return nil, err + } + return makePostgresSyncedTableRemote(syncedTable), nil +} + +func (r *ResourcePostgresSyncedTable) DoCreate(ctx context.Context, config *PostgresSyncedTableState) (string, *PostgresSyncedTableRemote, error) { + waiter, err := r.client.Postgres.CreateSyncedTable(ctx, postgres.CreateSyncedTableRequest{ + SyncedTableId: config.SyncedTableId, + SyncedTable: postgres.SyncedTable{ + Spec: &config.SyncedTableSyncedTableSpec, + + // Output-only fields. + CreateTime: nil, + Name: "", + Status: nil, + Uid: "", + ForceSendFields: nil, + }, + }) + if err != nil { + return "", nil, err + } + + result, err := waiter.Wait(ctx) + if err != nil { + return "", nil, err + } + remote := makePostgresSyncedTableRemote(result) + return remote.Name, remote, nil +} + +func (r *ResourcePostgresSyncedTable) DoDelete(ctx context.Context, id string) error { + waiter, err := r.client.Postgres.DeleteSyncedTable(ctx, postgres.DeleteSyncedTableRequest{ + Name: id, + }) + if err != nil { + return err + } + return waiter.Wait(ctx) +} diff --git a/bundle/direct/dresources/resources.generated.yml b/bundle/direct/dresources/resources.generated.yml index ee9a4892d9..03f1950967 100644 --- a/bundle/direct/dresources/resources.generated.yml +++ b/bundle/direct/dresources/resources.generated.yml @@ -270,6 +270,28 @@ resources: - field: pg_version reason: spec:input_only + postgres_synced_tables: + + ignore_remote_changes: + - field: branch + reason: spec:input_only + - field: create_database_objects_if_missing + reason: spec:input_only + - field: existing_pipeline_id + reason: spec:input_only + - field: new_pipeline_spec + reason: spec:input_only + - field: postgres_database + reason: spec:input_only + - field: primary_key_columns + reason: spec:input_only + - field: scheduling_policy + reason: spec:input_only + - field: source_table_full_name + reason: spec:input_only + - field: timeseries_key + reason: spec:input_only + # quality_monitors: no api field behaviors # registered_models: no api field behaviors diff --git a/bundle/direct/dresources/resources.yml b/bundle/direct/dresources/resources.yml index 8c8a6ce07c..eb5cd9809d 100644 --- a/bundle/direct/dresources/resources.yml +++ b/bundle/direct/dresources/resources.yml @@ -526,6 +526,36 @@ resources: - field: create_database_if_missing reason: immutable + postgres_synced_tables: + # The Postgres API has no UpdateSyncedTable endpoint, so every settable + # field is recreate-only on the intent side (local YAML edit -> delete + + # create). The complementary ignore_remote_changes block for this resource + # lives in resources.generated.yml and handles the read side: it suppresses + # drift for the same fields because the GET API does not echo back the + # spec. Together they make no-op deploys idempotent while a real config + # edit still triggers a recreate. Same pattern as secret_scopes. + recreate_on_changes: + - field: synced_table_id + reason: immutable + - field: branch + reason: immutable + - field: postgres_database + reason: immutable + - field: source_table_full_name + reason: immutable + - field: primary_key_columns + reason: immutable + - field: timeseries_key + reason: immutable + - field: scheduling_policy + reason: immutable + - field: create_database_objects_if_missing + reason: immutable + - field: new_pipeline_spec + reason: immutable + - field: existing_pipeline_id + reason: immutable + vector_search_endpoints: recreate_on_changes: - field: endpoint_type diff --git a/bundle/internal/schema/annotations.yml b/bundle/internal/schema/annotations.yml index 0f2e1b0c79..041ba102dd 100644 --- a/bundle/internal/schema/annotations.yml +++ b/bundle/internal/schema/annotations.yml @@ -224,6 +224,9 @@ github.com/databricks/cli/bundle/config.Resources: "postgres_projects": "description": |- PLACEHOLDER + "postgres_synced_tables": + "description": |- + The Postgres synced table definitions for the bundle, where each key is the name of the synced table. Each entry continuously replicates a Unity Catalog Delta source table into a Postgres table on a Lakebase Autoscaling instance. "quality_monitors": "description": |- The quality monitor definitions for the bundle, where each key is the name of the quality monitor. @@ -922,6 +925,40 @@ github.com/databricks/cli/bundle/config/resources.PostgresProject: "update_time": "description": |- PLACEHOLDER +github.com/databricks/cli/bundle/config/resources.PostgresSyncedTable: + "branch": + "description": |- + PLACEHOLDER + "create_database_objects_if_missing": + "description": |- + PLACEHOLDER + "existing_pipeline_id": + "description": |- + PLACEHOLDER + "lifecycle": + "description": |- + PLACEHOLDER + "new_pipeline_spec": + "description": |- + PLACEHOLDER + "postgres_database": + "description": |- + PLACEHOLDER + "primary_key_columns": + "description": |- + PLACEHOLDER + "scheduling_policy": + "description": |- + PLACEHOLDER + "source_table_full_name": + "description": |- + PLACEHOLDER + "synced_table_id": + "description": |- + PLACEHOLDER + "timeseries_key": + "description": |- + PLACEHOLDER github.com/databricks/cli/bundle/config/resources.SecretScope: "backend_type": "description": |- diff --git a/bundle/internal/validation/generated/enum_fields.go b/bundle/internal/validation/generated/enum_fields.go index 2f1593a890..bd481f9f2e 100644 --- a/bundle/internal/validation/generated/enum_fields.go +++ b/bundle/internal/validation/generated/enum_fields.go @@ -184,6 +184,8 @@ var EnumFields = map[string][]string{ "resources.postgres_projects.*.permissions[*].level": {"CAN_ATTACH_TO", "CAN_BIND", "CAN_CREATE", "CAN_CREATE_APP", "CAN_EDIT", "CAN_EDIT_METADATA", "CAN_MANAGE", "CAN_MANAGE_PRODUCTION_VERSIONS", "CAN_MANAGE_RUN", "CAN_MANAGE_STAGING_VERSIONS", "CAN_MONITOR", "CAN_MONITOR_ONLY", "CAN_QUERY", "CAN_READ", "CAN_RESTART", "CAN_RUN", "CAN_USE", "CAN_VIEW", "CAN_VIEW_METADATA", "IS_OWNER"}, + "resources.postgres_synced_tables.*.scheduling_policy": {"CONTINUOUS", "SNAPSHOT", "TRIGGERED"}, + "resources.quality_monitors.*.custom_metrics[*].type": {"CUSTOM_METRIC_TYPE_AGGREGATE", "CUSTOM_METRIC_TYPE_DERIVED", "CUSTOM_METRIC_TYPE_DRIFT"}, "resources.quality_monitors.*.inference_log.problem_type": {"PROBLEM_TYPE_CLASSIFICATION", "PROBLEM_TYPE_REGRESSION"}, "resources.quality_monitors.*.schedule.pause_status": {"PAUSED", "UNPAUSED", "UNSPECIFIED"}, diff --git a/bundle/internal/validation/generated/required_fields.go b/bundle/internal/validation/generated/required_fields.go index ad2edcd333..ae6da95317 100644 --- a/bundle/internal/validation/generated/required_fields.go +++ b/bundle/internal/validation/generated/required_fields.go @@ -227,6 +227,8 @@ var RequiredFields = map[string][]string{ "resources.postgres_projects.*": {"project_id"}, "resources.postgres_projects.*.permissions[*]": {"level"}, + "resources.postgres_synced_tables.*": {"synced_table_id"}, + "resources.quality_monitors.*": {"assets_dir", "output_schema_name", "table_name"}, "resources.quality_monitors.*.custom_metrics[*]": {"definition", "input_columns", "name", "output_data_type", "type"}, "resources.quality_monitors.*.inference_log": {"granularities", "model_id_col", "prediction_col", "problem_type", "timestamp_col"}, diff --git a/bundle/schema/jsonschema.json b/bundle/schema/jsonschema.json index 389efd115b..414e70fedd 100644 --- a/bundle/schema/jsonschema.json +++ b/bundle/schema/jsonschema.json @@ -1550,6 +1550,56 @@ } ] }, + "resources.PostgresSyncedTable": { + "oneOf": [ + { + "type": "object", + "properties": { + "branch": { + "$ref": "#/$defs/string" + }, + "create_database_objects_if_missing": { + "$ref": "#/$defs/bool" + }, + "existing_pipeline_id": { + "$ref": "#/$defs/string" + }, + "lifecycle": { + "$ref": "#/$defs/github.com/databricks/cli/bundle/config/resources.Lifecycle" + }, + "new_pipeline_spec": { + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/postgres.NewPipelineSpec" + }, + "postgres_database": { + "$ref": "#/$defs/string" + }, + "primary_key_columns": { + "$ref": "#/$defs/slice/string" + }, + "scheduling_policy": { + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/postgres.SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy" + }, + "source_table_full_name": { + "$ref": "#/$defs/string" + }, + "synced_table_id": { + "$ref": "#/$defs/string" + }, + "timeseries_key": { + "$ref": "#/$defs/string" + } + }, + "additionalProperties": false, + "required": [ + "synced_table_id" + ] + }, + { + "type": "string", + "pattern": "\\$\\{(var(\\.[a-zA-Z]+([-_]?[a-zA-Z0-9]+)*(\\[[0-9]+\\])*)+)\\}" + } + ] + }, "resources.QualityMonitor": { "oneOf": [ { @@ -2571,6 +2621,10 @@ "postgres_projects": { "$ref": "#/$defs/map/github.com/databricks/cli/bundle/config/resources.PostgresProject" }, + "postgres_synced_tables": { + "description": "The Postgres synced table definitions for the bundle, where each key is the name of the synced table. Each entry continuously replicates a Unity Catalog Delta source table into a Postgres table on a Lakebase Autoscaling instance.", + "$ref": "#/$defs/map/github.com/databricks/cli/bundle/config/resources.PostgresSyncedTable" + }, "quality_monitors": { "description": "The quality monitor definitions for the bundle, where each key is the name of the quality monitor.", "$ref": "#/$defs/map/github.com/databricks/cli/bundle/config/resources.QualityMonitor", @@ -9863,6 +9917,29 @@ } ] }, + "postgres.NewPipelineSpec": { + "oneOf": [ + { + "type": "object", + "properties": { + "budget_policy_id": { + "$ref": "#/$defs/string" + }, + "storage_catalog": { + "$ref": "#/$defs/string" + }, + "storage_schema": { + "$ref": "#/$defs/string" + } + }, + "additionalProperties": false + }, + { + "type": "string", + "pattern": "\\$\\{(var(\\.[a-zA-Z]+([-_]?[a-zA-Z0-9]+)*(\\[[0-9]+\\])*)+)\\}" + } + ] + }, "postgres.ProjectCustomTag": { "oneOf": [ { @@ -9920,6 +9997,9 @@ } ] }, + "postgres.SyncedTableSyncedTableSpecSyncedTableSchedulingPolicy": { + "type": "string" + }, "serving.Ai21LabsConfig": { "oneOf": [ { @@ -11829,6 +11909,20 @@ } ] }, + "resources.PostgresSyncedTable": { + "oneOf": [ + { + "type": "object", + "additionalProperties": { + "$ref": "#/$defs/github.com/databricks/cli/bundle/config/resources.PostgresSyncedTable" + } + }, + { + "type": "string", + "pattern": "\\$\\{(var(\\.[a-zA-Z]+([-_]?[a-zA-Z0-9]+)*(\\[[0-9]+\\])*)+)\\}" + } + ] + }, "resources.QualityMonitor": { "oneOf": [ { diff --git a/bundle/schema/jsonschema_for_docs.json b/bundle/schema/jsonschema_for_docs.json index ab06243d22..ff64ac25c2 100644 --- a/bundle/schema/jsonschema_for_docs.json +++ b/bundle/schema/jsonschema_for_docs.json @@ -1967,7 +1967,8 @@ "x-since-version": "v0.298.0" }, "target_qps": { - "$ref": "#/$defs/int64" + "$ref": "#/$defs/int64", + "x-since-version": "v0.299.2" }, "usage_policy_id": { "$ref": "#/$defs/string", @@ -4315,7 +4316,8 @@ "description": "The confidential computing technology for this cluster's instances.\nCurrently only SEV_SNP is supported, and only on N2D instance types.\nWhen not set, no confidential computing is applied.", "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/compute.ConfidentialComputeType", "x-databricks-preview": "PRIVATE", - "doNotSuggest": true + "doNotSuggest": true, + "x-since-version": "v0.299.2" }, "first_on_demand": { "description": "The first `first_on_demand` nodes of the cluster will be placed on on-demand instances.\nThis value should be greater than 0, to make sure the cluster driver node is placed on an\non-demand instance. If this value is greater than or equal to the current cluster size, all\nnodes will be placed on on-demand instances. If this value is less than the current cluster\nsize, `first_on_demand` nodes will be placed on on-demand instances and the remainder will\nbe placed on `availability` instances. Note that this value does not affect\ncluster size and cannot currently be mutated over the lifetime of a cluster.", @@ -6788,7 +6790,8 @@ "properties": { "include_confluence_spaces": { "description": "(Optional) Spaces to filter Confluence data on", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -6812,7 +6815,8 @@ "properties": { "confluence_options": { "description": "Confluence specific options for ingestion", - "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.ConfluenceConnectorOptions" + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.ConfluenceConnectorOptions", + "x-since-version": "v0.299.2" }, "gdrive_options": { "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.GoogleDriveOptions", @@ -6829,17 +6833,20 @@ }, "jira_options": { "description": "Jira specific options for ingestion", - "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.JiraConnectorOptions" + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.JiraConnectorOptions", + "x-since-version": "v0.299.2" }, "meta_ads_options": { "description": "Meta Marketing (Meta Ads) specific options for ingestion", - "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.MetaMarketingOptions" + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.MetaMarketingOptions", + "x-since-version": "v0.299.2" }, "outlook_options": { "description": "Outlook specific options for ingestion", "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.OutlookOptions", "x-databricks-preview": "PRIVATE", - "doNotSuggest": true + "doNotSuggest": true, + "x-since-version": "v0.299.2" }, "sharepoint_options": { "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.SharepointOptions", @@ -6851,7 +6858,8 @@ "description": "Smartsheet specific options for ingestion", "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.SmartsheetOptions", "x-databricks-preview": "PRIVATE", - "doNotSuggest": true + "doNotSuggest": true, + "x-since-version": "v0.299.2" }, "tiktok_ads_options": { "description": "TikTok Ads specific options for ingestion", @@ -6864,7 +6872,8 @@ "description": "Zendesk Support specific options for ingestion", "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.ZendeskSupportOptions", "x-databricks-preview": "PRIVATE", - "doNotSuggest": true + "doNotSuggest": true, + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -7092,7 +7101,8 @@ "properties": { "manager_account_id": { "description": "(Required) Manager Account ID (also called MCC Account ID) used to list and access\ncustomer accounts under this manager account. This is required for fetching the list\nof customer accounts during source selection.\nIf the same field is also set in the object-level GoogleAdsOptions (connector_options),\nthe object-level value takes precedence over this top-level config.", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -7451,7 +7461,8 @@ "properties": { "include_jira_spaces": { "description": "(Optional) Projects to filter Jira data on", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -7466,35 +7477,43 @@ "properties": { "action_attribution_windows": { "description": "(Optional) Action attribution windows for insights reporting (e.g. \"28d_click\", \"1d_view\")", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "action_breakdowns": { "description": "(Optional) Action breakdowns to configure for data aggregation", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "action_report_time": { "description": "(Optional) Timing used to report action statistics (impression, conversion, mixed, or lifetime)", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" }, "breakdowns": { "description": "(Optional) Breakdowns to configure for data aggregation", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "custom_insights_lookback_window": { "description": "(Optional) Window in days to revisit data during sync to capture\nupdated conversion data from the API.", - "$ref": "#/$defs/int" + "$ref": "#/$defs/int", + "x-since-version": "v0.299.2" }, "level": { "description": "(Optional) Granularity of data to pull (account, ad, adset, campaign)", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" }, "start_date": { "description": "(Optional) Start date in yyyy-MM-dd format (e.g. 2025-01-15). Data added\nafter this date will be ingested", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" }, "time_increment": { "description": "(Optional) Value in string by which to aggregate statistics (can take all_days, monthly or number of days)", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -7575,48 +7594,58 @@ "properties": { "attachment_mode": { "description": "(Optional) Controls which attachments to ingest.\nIf not specified, defaults to ALL.", - "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.OutlookAttachmentMode" + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.OutlookAttachmentMode", + "x-since-version": "v0.299.2" }, "body_format": { "description": "(Optional) Defines how the body_content column is populated.\nTEXT_HTML: Preserves full formatting, links, and styling.\nTEXT_PLAIN: Converts body to plain text. Recommended for AI/RAG pipelines to reduce token usage and noise.", - "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.OutlookBodyFormat" + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.OutlookBodyFormat", + "x-since-version": "v0.299.2" }, "folder_filter": { "description": "Deprecated. Use include_folders instead.", "$ref": "#/$defs/slice/string", "deprecationMessage": "This field is deprecated", + "x-since-version": "v0.299.2", "deprecated": true }, "include_folders": { "description": "(Optional) Filter mail folders to include in the sync.\nIf not specified, all folders will be synced.\nExamples: Inbox, Sent Items, Custom_Folder\nFilter semantics: OR between different folders.", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "include_mailboxes": { "description": "(Optional) List of mailboxes to sync (e.g. mailbox email addresses or identifiers).\nIf not specified, all accessible mailboxes are ingested.\nFilter semantics: OR between different mailboxes.", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "include_senders": { "description": "(Optional) Filter emails by sender address. Uses exact email match.\nExamples: user@vendor.com, alerts@system.io, noreply@company.com\nIf not specified, emails from all senders will be synced.\nFilter semantics: OR between different senders.", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "include_subjects": { "description": "(Optional) Filter emails by subject line. Values ending with \"*\" use prefix match (subject starts with\nthe part before \"*\"); otherwise substring match (subject contains the value).\nExamples: \"Invoice\" (substring), \"Re:*\" (prefix), \"Support Ticket\", \"URGENT*\"\nIf not specified, emails with all subjects will be synced.\nFilter semantics: OR between different subjects.", - "$ref": "#/$defs/slice/string" + "$ref": "#/$defs/slice/string", + "x-since-version": "v0.299.2" }, "sender_filter": { "description": "Deprecated. Use include_senders instead.", "$ref": "#/$defs/slice/string", "deprecationMessage": "This field is deprecated", + "x-since-version": "v0.299.2", "deprecated": true }, "start_date": { "description": "(Optional) Start date for the initial sync in YYYY-MM-DD format.\nFormat: YYYY-MM-DD (e.g., 2024-01-01)\nThis determines the earliest date from which to sync historical data.\nIf not specified, complete history is ingested.", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" }, "subject_filter": { "description": "Deprecated. Use include_subjects instead.", "$ref": "#/$defs/slice/string", "deprecationMessage": "This field is deprecated", + "x-since-version": "v0.299.2", "deprecated": true } }, @@ -8054,7 +8083,8 @@ "properties": { "enforce_schema": { "description": "(Optional) When true, maps each column to its Smartsheet-declared type (Text/Number/Date/\nCheckbox/etc.). Cells that do not conform to the declared type are set to NULL.\nWhen false, all columns land as STRING. Use false for sheets with irregular data or columns\nthat frequently violate their own declared type.\nIf not specified, defaults to true.", - "$ref": "#/$defs/bool" + "$ref": "#/$defs/bool", + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -8087,7 +8117,8 @@ "google_ads_config": { "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.GoogleAdsConfig", "x-databricks-preview": "PRIVATE", - "doNotSuggest": true + "doNotSuggest": true, + "x-since-version": "v0.299.2" } }, "additionalProperties": false @@ -8281,7 +8312,8 @@ "properties": { "start_date": { "description": "(Optional) Start date in YYYY-MM-DD format for the initial sync.\nThis determines the earliest date from which to sync historical data.", - "$ref": "#/$defs/string" + "$ref": "#/$defs/string", + "x-since-version": "v0.299.2" } }, "additionalProperties": false diff --git a/bundle/statemgmt/state_load_test.go b/bundle/statemgmt/state_load_test.go index 097b708e62..000a753fbc 100644 --- a/bundle/statemgmt/state_load_test.go +++ b/bundle/statemgmt/state_load_test.go @@ -50,6 +50,7 @@ func TestStateToBundleEmptyLocalResources(t *testing.T) { "resources.postgres_branches.test_postgres_branch": {ID: "projects/test-project/branches/main"}, "resources.postgres_endpoints.test_postgres_endpoint": {ID: "projects/test-project/branches/main/endpoints/primary"}, "resources.postgres_catalogs.test_postgres_catalog": {ID: "catalogs/test_catalog"}, + "resources.postgres_synced_tables.test_postgres_synced_table": {ID: "synced_tables/main.public.test_synced_table"}, "resources.vector_search_endpoints.test_vector_search_endpoint": {ID: "vs-endpoint-1"}, } err := StateToBundle(t.Context(), state, &config) @@ -303,6 +304,13 @@ func TestStateToBundleEmptyRemoteResources(t *testing.T) { }, }, }, + PostgresSyncedTables: map[string]*resources.PostgresSyncedTable{ + "test_postgres_synced_table": { + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "main.public.test_synced_table", + }, + }, + }, VectorSearchEndpoints: map[string]*resources.VectorSearchEndpoint{ "test_vector_search_endpoint": { CreateEndpoint: vectorsearch.CreateEndpoint{ @@ -687,6 +695,13 @@ func TestStateToBundleModifiedResources(t *testing.T) { }, }, }, + PostgresSyncedTables: map[string]*resources.PostgresSyncedTable{ + "test_postgres_synced_table": { + PostgresSyncedTableConfig: resources.PostgresSyncedTableConfig{ + SyncedTableId: "main.public.test_synced_table", + }, + }, + }, VectorSearchEndpoints: map[string]*resources.VectorSearchEndpoint{ "test_vector_search_endpoint": { CreateEndpoint: vectorsearch.CreateEndpoint{ diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index d00a5039b8..4870e25e07 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -168,11 +168,12 @@ type FakeWorkspace struct { DatabaseCatalogs map[string]database.DatabaseCatalog SyncedDatabaseTables map[string]database.SyncedDatabaseTable - PostgresProjects map[string]postgres.Project - PostgresBranches map[string]postgres.Branch - PostgresEndpoints map[string]postgres.Endpoint - PostgresCatalogs map[string]postgres.Catalog - PostgresOperations map[string]postgres.Operation + PostgresProjects map[string]postgres.Project + PostgresBranches map[string]postgres.Branch + PostgresEndpoints map[string]postgres.Endpoint + PostgresCatalogs map[string]postgres.Catalog + PostgresSyncedTables map[string]postgres.SyncedTable + PostgresOperations map[string]postgres.Operation // Branches and endpoints that the server provisioned implicitly together // with their parent (e.g. the production branch on a new project, or the @@ -308,6 +309,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresBranches: map[string]postgres.Branch{}, PostgresEndpoints: map[string]postgres.Endpoint{}, PostgresCatalogs: map[string]postgres.Catalog{}, + PostgresSyncedTables: map[string]postgres.SyncedTable{}, PostgresOperations: map[string]postgres.Operation{}, postgresImplicitBranches: map[string]bool{}, postgresImplicitEndpoints: map[string]bool{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index c78fd2b3ad..e0e1e87318 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -959,6 +959,30 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.PostgresOperationGet(name) }) + // Postgres Synced Tables: + server.Handle("POST", "/api/2.0/postgres/synced_tables", func(req Request) any { + syncedTableID := req.URL.Query().Get("synced_table_id") + return req.Workspace.PostgresSyncedTableCreate(req, syncedTableID) + }) + + server.Handle("GET", "/api/2.0/postgres/synced_tables/{id}", func(req Request) any { + return req.Workspace.PostgresSyncedTableGet("synced_tables/" + req.Vars["id"]) + }) + + server.Handle("DELETE", "/api/2.0/postgres/synced_tables/{id}", func(req Request) any { + return req.Workspace.PostgresSyncedTableDelete("synced_tables/" + req.Vars["id"]) + }) + + server.Handle("GET", "/api/2.0/postgres/synced_tables/{id}/operations/{operation_id}", func(req Request) any { + name := "synced_tables/" + req.Vars["id"] + "/operations/" + req.Vars["operation_id"] + return req.Workspace.PostgresOperationGet(name) + }) + + server.Handle("GET", "/api/2.0/postgres/operations/{operation_id}", func(req Request) any { + return req.Workspace.PostgresOperationGet("operations/" + req.Vars["operation_id"]) + }) + + // Catch-all handler for invalid postgres resource names. // This handles cases like GET /api/2.0/postgres/1234 where "1234" is not a valid resource name. server.Handle("GET", "/api/2.0/postgres/{name}", func(req Request) any { diff --git a/libs/testserver/postgres.go b/libs/testserver/postgres.go index e720f00701..f30c5d1cad 100644 --- a/libs/testserver/postgres.go +++ b/libs/testserver/postgres.go @@ -786,6 +786,8 @@ func (s *FakeWorkspace) createOperationLocked(resourceName string, response any) switch { case strings.HasPrefix(resourceName, "catalogs/"): resourceType = "Catalog" + case strings.HasPrefix(resourceName, "synced_tables/"): + resourceType = "SyncedTable" case strings.Contains(resourceName, "/endpoints/"): resourceType = "Endpoint" case strings.Contains(resourceName, "/branches/"): @@ -809,6 +811,67 @@ func (s *FakeWorkspace) createOperationLocked(resourceName string, response any) return op } +// PostgresSyncedTableCreate creates a new postgres synced table. +func (s *FakeWorkspace) PostgresSyncedTableCreate(req Request, syncedTableID string) Response { + defer s.LockUnlock()() + + if syncedTableID == "" { + return postgresErrorResponse(400, "INVALID_PARAMETER_VALUE", `Field 'synced_table_id' is required, expected non-default value (not "")!`) + } + + var table postgres.SyncedTable + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &table); err != nil { + return Response{ + StatusCode: 400, + Body: fmt.Sprintf("cannot unmarshal request body: %v", err), + } + } + } + + name := "synced_tables/" + syncedTableID + + if _, exists := s.PostgresSyncedTables[name]; exists { + return postgresErrorResponse(409, "ALREADY_EXISTS", "synced table with such id already exists") + } + table.Name = name + table.Uid = nextUUID() + table.CreateTime = nowTime() + + // GET on the real API returns only status; clear spec to match. + table.Spec = nil + table.Status = &postgres.SyncedTableSyncedTableStatus{ + DetailedState: postgres.SyncedTableStateSyncedTableOnline, + UnityCatalogProvisioningState: postgres.ProvisioningInfoStateActive, + } + + s.PostgresSyncedTables[name] = table + + return Response{Body: s.createOperationLocked(name, table)} +} + +// PostgresSyncedTableGet retrieves a postgres synced table by name. +func (s *FakeWorkspace) PostgresSyncedTableGet(name string) Response { + defer s.LockUnlock()() + + table, exists := s.PostgresSyncedTables[name] + if !exists { + return postgresNotFoundResponse("synced table") + } + return Response{Body: table} +} + +// PostgresSyncedTableDelete deletes a postgres synced table. +func (s *FakeWorkspace) PostgresSyncedTableDelete(name string) Response { + defer s.LockUnlock()() + + if _, exists := s.PostgresSyncedTables[name]; !exists { + return postgresNotFoundResponse("synced table") + } + delete(s.PostgresSyncedTables, name) + return Response{Body: s.createOperationLocked(name, nil)} +} + // createDefaultBranchLocked creates a default branch for a project (caller must hold lock). // The default branch is named "production" to match cloud API behavior. func (s *FakeWorkspace) createDefaultBranchLocked(projectName string) {