Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repos:
args: ["--profile", "black"]

- repo: https://github.com/pycqa/flake8
rev: 6.0.0
rev: 7.1.1
hooks:
- id: flake8

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ models:
+file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}"

flags:
require_batched_execution_for_custom_microbatch_strategy: True
require_batched_execution_for_custom_microbatch_strategy: True
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from contextlib import contextmanager

import pytest

from dbt_project import DbtProject


Expand Down Expand Up @@ -44,20 +43,30 @@ def _microbatch_model_sql(source_model_name: str) -> str:
amount,
order_date
from {{ ref('__MICROBATCH_SOURCE_MODEL__') }}
""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name)
""".replace(
"__MICROBATCH_SOURCE_MODEL__", source_model_name
)


@contextmanager
def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str):
source_model_name = f"mb_src_{model_suffix}"
target_model_name = f"mb_tgt_{model_suffix}"
source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql")
target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql")
source_model_path = dbt_project.tmp_models_dir_path.joinpath(
f"{source_model_name}.sql"
)
target_model_path = dbt_project.tmp_models_dir_path.joinpath(
f"{target_model_name}.sql"
)

source_model_path.write_text(_microbatch_source_model_sql())
target_model_path.write_text(_microbatch_model_sql(source_model_name))
relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path)
relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path)
relative_source_model_path = source_model_path.relative_to(
dbt_project.project_dir_path
)
relative_target_model_path = target_model_path.relative_to(
dbt_project.project_dir_path
)
try:
yield relative_source_model_path, relative_target_model_path, target_model_name
finally:
Expand All @@ -75,9 +84,7 @@ def _run_microbatch_model_and_get_latest_success_result(
model_path,
target_model_name,
):
dbt_project.dbt_runner.run(
select=f"{source_model_path} {model_path}"
)
dbt_project.dbt_runner.run(select=f"{source_model_path} {model_path}")

unique_id = f"model.elementary_tests.{target_model_name}"
run_results = dbt_project.read_table(
Expand All @@ -91,14 +98,14 @@ def _run_microbatch_model_and_get_latest_success_result(

@contextmanager
def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
macro_path = (
dbt_project.project_dir_path / "macros" / "microbatch.sql"
)
macro_path = dbt_project.project_dir_path / "macros" / "microbatch.sql"
macro_sql = """
{% macro __MACRO_NAME__(arg_dict) %}
{{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }}
{% endmacro %}
""".replace("__MACRO_NAME__", macro_name)
""".replace(
"__MACRO_NAME__", macro_name
)
if macro_path.exists():
raise FileExistsError(f"Expected no macro file at {macro_path}")

Expand All @@ -110,7 +117,9 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
macro_path.unlink()


@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"])
@pytest.mark.skip_targets(
["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"]
)
@pytest.mark.skip_for_dbt_fusion
@pytest.mark.parametrize(
"macro_name,expected_compiled_code,model_suffix",
Expand All @@ -134,10 +143,10 @@ def test_microbatch_run_results_compiled_code_behavior(
)
assert run_results, "Expected a successful run result row for microbatch model"
if expected_compiled_code:
assert run_results[0]["compiled_code"], (
"Expected compiled_code to be populated when override macro is present"
)
assert run_results[0][
"compiled_code"
], "Expected compiled_code to be populated when override macro is present"
else:
assert not run_results[0]["compiled_code"], (
"Expected compiled_code to stay empty when override macro is absent"
)
assert not run_results[0][
"compiled_code"
], "Expected compiled_code to stay empty when override macro is absent"
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
model.get("unique_id") if model is mapping else model.unique_id
) | default(none, true) %}
{% set model_compiled_code = (
model.get("compiled_code") if model is mapping else model.compiled_code
model.get("compiled_code")
if model is mapping
else model.compiled_code
) | default(none, true) %}
{% if model_unique_id is none %}
{{ return(none) }}
{% endif %}
{% if not model_compiled_code %}
{{ return(none) }}
{% endif %}
{% if model_unique_id is none %} {{ return(none) }} {% endif %}
{% if not model_compiled_code %} {{ return(none) }} {% endif %}

{% set compiled_code_by_unique_id = elementary.get_cache(
"microbatch_compiled_code_by_unique_id"
Expand Down
4 changes: 3 additions & 1 deletion macros/utils/graph/get_compiled_code.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"microbatch_compiled_code_by_unique_id", {}
).get(node.get("unique_id")) %}
{% endif %}
{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %}
{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(
compiled_code
) %}

{% set max_column_size = elementary.get_column_size() %}
{% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %}
Expand Down
Loading