From afcdaaebf1d3d9c16e79ce03fb5f3b3f62bf93ce Mon Sep 17 00:00:00 2001 From: Namrata Madan Date: Fri, 1 May 2026 13:28:52 -0700 Subject: [PATCH] fix: create asymmetric validation key in step compiler --- .../src/sagemaker/core/experiments/_utils.py | 12 +- .../remote_function/core/serialization.py | 47 - .../tests/integ/remote_function/conftest.py | 319 +----- .../integ/remote_function/test_decorator.py | 22 + .../mlops/workflow/_steps_compiler.py | 12 +- .../src/sagemaker/mlops/workflow/pipeline.py | 36 +- .../tests/data/workflow/config.yaml | 10 + .../tests/data/workflow/dummy_input.txt | 0 .../tests/data/workflow/dummy_script.py | 15 + .../tests/data/workflow/pre_exec_commands | 4 + .../tests/data/workflow/requirements.txt | 1 + sagemaker-mlops/tests/integ/conftest.py | 232 +++- .../tests/integ/workflow/helpers.py | 120 +++ .../test_pipeline_train_registry.py | 8 +- .../integ/workflow/test_step_decorator.py | 995 ++++++++++++++++++ .../workflow/workdir_helpers/__init__.py | 12 + .../workflow/workdir_helpers/local_module.py | 17 + .../workdir_helpers/nested_helper/__init__.py | 12 + .../nested_helper/local_module2.py | 17 + .../tests/unit/workflow/test_pipeline.py | 10 +- tests/__init__.py | 12 + tests/integ_helpers/__init__.py | 12 + tests/integ_helpers/container_build.py | 294 ++++++ 23 files changed, 1870 insertions(+), 349 deletions(-) create mode 100644 sagemaker-mlops/tests/data/workflow/config.yaml create mode 100644 sagemaker-mlops/tests/data/workflow/dummy_input.txt create mode 100644 sagemaker-mlops/tests/data/workflow/dummy_script.py create mode 100644 sagemaker-mlops/tests/data/workflow/pre_exec_commands create mode 100644 sagemaker-mlops/tests/data/workflow/requirements.txt create mode 100644 sagemaker-mlops/tests/integ/workflow/helpers.py rename sagemaker-mlops/tests/integ/{ => workflow}/test_pipeline_train_registry.py (97%) create mode 100644 sagemaker-mlops/tests/integ/workflow/test_step_decorator.py create mode 100644 sagemaker-mlops/tests/integ/workflow/workdir_helpers/__init__.py create mode 100644 sagemaker-mlops/tests/integ/workflow/workdir_helpers/local_module.py create mode 100644 sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/__init__.py create mode 100644 sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/local_module2.py create mode 100644 tests/__init__.py create mode 100644 tests/integ_helpers/__init__.py create mode 100644 tests/integ_helpers/container_build.py diff --git a/sagemaker-core/src/sagemaker/core/experiments/_utils.py b/sagemaker-core/src/sagemaker/core/experiments/_utils.py index 7be530414d..fb11bef961 100644 --- a/sagemaker-core/src/sagemaker/core/experiments/_utils.py +++ b/sagemaker-core/src/sagemaker/core/experiments/_utils.py @@ -118,17 +118,23 @@ def get_tc_and_exp_config_from_job_env( job_name = environment.source_arn.split("/")[-1] if environment.environment_type == _EnvironmentType.SageMakerTrainingJob: job_response = retry_with_backoff( - callable_func=lambda: sagemaker_session.describe_training_job(job_name), + callable_func=lambda: sagemaker_session.sagemaker_client.describe_training_job( + TrainingJobName=job_name + ), num_attempts=4, ) elif environment.environment_type == _EnvironmentType.SageMakerProcessingJob: job_response = retry_with_backoff( - callable_func=lambda: sagemaker_session.describe_processing_job(job_name), + callable_func=lambda: sagemaker_session.sagemaker_client.describe_processing_job( + ProcessingJobName=job_name + ), num_attempts=4, ) else: # environment.environment_type == _EnvironmentType.SageMakerTransformJob job_response = retry_with_backoff( - callable_func=lambda: sagemaker_session.describe_transform_job(job_name), + callable_func=lambda: sagemaker_session.sagemaker_client.describe_transform_job( + TransformJobName=job_name + ), num_attempts=4, ) diff --git a/sagemaker-core/src/sagemaker/core/remote_function/core/serialization.py b/sagemaker-core/src/sagemaker/core/remote_function/core/serialization.py index 28ed6061e4..3091fdb0fb 100644 --- a/sagemaker-core/src/sagemaker/core/remote_function/core/serialization.py +++ b/sagemaker-core/src/sagemaker/core/remote_function/core/serialization.py @@ -366,34 +366,6 @@ def serialize_exception_to_s3( ) -def _upload_payload_and_metadata_to_s3( - bytes_to_upload: Union[bytes, io.BytesIO], - hmac_key: str, - s3_uri: str, - sagemaker_session: Session, - s3_kms_key, -): - """Uploads serialized payload and metadata to s3. - - Args: - bytes_to_upload (bytes): Serialized bytes to upload. - hmac_key (str): Key used to calculate hmac-sha256 hash of the serialized obj. - s3_uri (str): S3 root uri to which resulting serialized artifacts will be uploaded. - sagemaker_session (sagemaker.core.helper.session.Session): - The underlying Boto3 session which AWS service calls are delegated to. - s3_kms_key (str): KMS key used to encrypt artifacts uploaded to S3. - """ - _upload_bytes_to_s3(bytes_to_upload, f"{s3_uri}/payload.pkl", s3_kms_key, sagemaker_session) - - sha256_hash = _compute_hash(bytes_to_upload, secret_key=hmac_key) - - _upload_bytes_to_s3( - _MetaData(sha256_hash).to_json(), - f"{s3_uri}/metadata.json", - s3_kms_key, - sagemaker_session, - ) - def _upload_payload_and_metadata_to_s3_signed( bytes_to_upload: Union[bytes, io.BytesIO], private_key: ec.EllipticCurvePrivateKey, @@ -561,22 +533,3 @@ def _verify_sha256_hash(expected_hash: str, buffer: bytes): "Integrity check for the serialized function or data failed. " "Please restrict access to your S3 bucket" ) - - -def _compute_hash(buffer: bytes, secret_key: str) -> str: - """Compute the hmac-sha256 hash""" - return hmac.new(secret_key.encode(), msg=buffer, digestmod=hashlib.sha256).hexdigest() - - -def _perform_integrity_check(expected_hash_value: str, secret_key: str, buffer: bytes): - """Performs integrity checks for serialized code/arguments uploaded to s3. - - Verifies whether the hash read from s3 matches the hash calculated - during remote function execution. - """ - actual_hash_value = _compute_hash(buffer=buffer, secret_key=secret_key) - if not hmac.compare_digest(expected_hash_value, actual_hash_value): - raise DeserializationError( - "Integrity check for the serialized function or data failed. " - "Please restrict access to your S3 bucket" - ) diff --git a/sagemaker-core/tests/integ/remote_function/conftest.py b/sagemaker-core/tests/integ/remote_function/conftest.py index 706376ea58..46d0643ed1 100644 --- a/sagemaker-core/tests/integ/remote_function/conftest.py +++ b/sagemaker-core/tests/integ/remote_function/conftest.py @@ -12,78 +12,30 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import -import base64 import os import re import shutil -import subprocess import sys -from contextlib import contextmanager +import importlib.util as _importlib_util +import os as _os -import docker -import filelock import pytest -from docker.errors import BuildError -from sagemaker.core.common_utils import sagemaker_timestamp - -REPO_ACCOUNT_ID = "033110030271" - -REPO_NAME = "remote-function-dummy-container" - -DOCKERFILE_TEMPLATE = ( - "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" - "RUN apt-get update -y \ - && apt-get install -y unzip curl\n\n" - "RUN curl 'https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip'" - " -o 'awscliv2.zip' \ - && unzip awscliv2.zip \ - && ./aws/install\n\n" - "COPY {source_archive} ./\n" - "RUN pip3 install --no-cache-dir '{source_archive}'\n" - "RUN rm {source_archive}\n" -) - -DOCKERFILE_TEMPLATE_WITH_CONDA = ( - "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" - 'SHELL ["/bin/bash", "-c"]\n' - "RUN apt-get update -y \ - && apt-get install -y unzip curl\n\n" - "RUN curl -L -O " - "'https://github.com/conda-forge/miniforge/releases/download/24.11.3-2/" - "Miniforge3-Linux-x86_64.sh' \ - && bash Miniforge3-Linux-x86_64.sh -b -p '/opt/conda' \ - && /opt/conda/bin/conda init bash\n\n" - "ENV PATH $PATH:/opt/conda/bin\n" - "RUN mamba create -n integ_test_env python={py_version} -y \ - && mamba create -n default_env python={py_version} -y\n" - "COPY {source_archive} ./\n" - "RUN pip install --no-cache-dir '{source_archive}' \ - && mamba run -n base pip install --no-cache-dir '{source_archive}' \ - && mamba run -n default_env pip install --no-cache-dir '{source_archive}' \ - && mamba run -n integ_test_env pip install --no-cache-dir '{source_archive}'\n" - "ENV SHELL=/bin/bash\n" - "ENV SAGEMAKER_JOB_CONDA_ENV=default_env\n" +# --------------------------------------------------------------------------- +# Shared container-build helpers (file-locked, xdist-safe) +# --------------------------------------------------------------------------- +_container_build_path = _os.path.abspath( + _os.path.join(_os.path.dirname(__file__), "..", "..", "..", "..", "tests", "integ_helpers", "container_build.py") ) +_spec = _importlib_util.spec_from_file_location("integ_helpers.container_build", _container_build_path) +_container_build = _importlib_util.module_from_spec(_spec) +_spec.loader.exec_module(_container_build) -DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR = ( - "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" - "RUN apt-get update -y \ - && apt-get install -y unzip curl\n\n" - "RUN curl 'https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip'" - " -o 'awscliv2.zip' \ - && unzip awscliv2.zip \ - && ./aws/install\n\n" - "RUN apt install -y sudo\n" - "RUN useradd -ms /bin/bash integ-test-user\n" - "RUN usermod -aG sudo integ-test-user\n" - "RUN echo '%sudo ALL= (ALL) NOPASSWD:ALL' >> /etc/sudoers\n" - "USER integ-test-user\n" - "WORKDIR /home/integ-test-user\n" - "COPY {source_archive} ./\n" - "RUN pip install --no-cache-dir '{source_archive}'\n" - "RUN rm {source_archive}\n" -) +DOCKERFILE_TEMPLATE = _container_build.DOCKERFILE_TEMPLATE +DOCKERFILE_TEMPLATE_WITH_CONDA = _container_build.DOCKERFILE_TEMPLATE_WITH_CONDA +DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR = _container_build.DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR +build_sdk_tar_once = _container_build.build_sdk_tar_once +build_container_once = _container_build.build_container_once AUTO_CAPTURE_CLIENT_DOCKER_TEMPLATE = ( "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" @@ -150,50 +102,71 @@ def gpu_instance_type(): @pytest.fixture(scope="session") -def dummy_container_without_error(sagemaker_session, compatible_python_version, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "dummy_container_without_error", sagemaker_session, compatible_python_version, +def sagemaker_sdk_tar_path(tmp_path_factory): + """Build the sagemaker-core sdist exactly once across all xdist workers.""" + return build_sdk_tar_once(tmp_path_factory) + + +@pytest.fixture(scope="session") +def dummy_container_without_error(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_without_error", + sagemaker_session, compatible_python_version, DOCKERFILE_TEMPLATE, sagemaker_sdk_tar_path, tmp_path_factory, ) @pytest.fixture(scope="session") -def dummy_container_with_user_and_workdir(sagemaker_session, compatible_python_version, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "dummy_container_with_user_and_workdir", sagemaker_session, compatible_python_version, +def dummy_container_with_user_and_workdir(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_with_user_and_workdir", + sagemaker_session, compatible_python_version, DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR, sagemaker_sdk_tar_path, tmp_path_factory, ) @pytest.fixture(scope="session") -def dummy_container_incompatible_python_runtime(sagemaker_session, incompatible_python_version, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "dummy_container_incompatible_python_runtime", sagemaker_session, incompatible_python_version, +def dummy_container_incompatible_python_runtime(sagemaker_session, incompatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_incompatible_python_runtime", + sagemaker_session, incompatible_python_version, DOCKERFILE_TEMPLATE, sagemaker_sdk_tar_path, tmp_path_factory, ) @pytest.fixture(scope="session") -def dummy_container_with_conda(sagemaker_session, compatible_python_version, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "dummy_container_with_conda", sagemaker_session, compatible_python_version, +def dummy_container_with_conda(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_with_conda", + sagemaker_session, compatible_python_version, DOCKERFILE_TEMPLATE_WITH_CONDA, sagemaker_sdk_tar_path, tmp_path_factory, ) @pytest.fixture(scope="session") def auto_capture_test_container(sagemaker_session, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "auto_capture_test_container", sagemaker_session, "3.10", + def _copy_auto_capture_test_file(tmpdir): + source_path = os.path.join(os.path.dirname(__file__), "test_auto_capture.py") + shutil.copy2(source_path, os.path.join(tmpdir, "test_auto_capture.py")) + + return build_container_once( + "auto_capture_test_container", + sagemaker_session, "3.10", AUTO_CAPTURE_CLIENT_DOCKER_TEMPLATE, sagemaker_sdk_tar_path, tmp_path_factory, is_auto_capture=True, + extra_files_hook=_copy_auto_capture_test_file, ) @pytest.fixture(scope="session") def spark_test_container(sagemaker_session, sagemaker_sdk_tar_path, tmp_path_factory): - return _build_container_once( - "spark_test_container", sagemaker_session, "3.9", + return build_container_once( + "spark_test_container", + sagemaker_session, "3.9", DOCKERFILE_TEMPLATE, sagemaker_sdk_tar_path, tmp_path_factory, ) @@ -216,190 +189,4 @@ def conda_env_yml(): os.remove(conda_yml_file_name) -@pytest.fixture(scope="session") -def sagemaker_sdk_tar_path(tmp_path_factory): - """Build the sagemaker-core sdist once and share it across all xdist workers. - - Uses a file lock so only one worker runs the build; others wait and reuse - the already-built tar.gz from the shared temp directory. - """ - # tmp_path_factory.getbasetemp().parent is shared across all xdist workers - root_tmp = tmp_path_factory.getbasetemp().parent - tar_dir = root_tmp / "sagemaker_sdk_tar" - tar_dir.mkdir(exist_ok=True) - lock_file = root_tmp / "sagemaker_sdk_tar.lock" - - with filelock.FileLock(str(lock_file)): - existing = list(tar_dir.glob("*.tar.gz")) - if not existing: - _generate_sagemaker_sdk_tar(str(tar_dir)) - existing = list(tar_dir.glob("*.tar.gz")) - return str(existing[0]) - - -def _tmpdir(): - """Create a temporary directory context manager.""" - import tempfile - - tmpdir = tempfile.mkdtemp() - try: - yield tmpdir - finally: - shutil.rmtree(tmpdir) - - -_tmpdir = contextmanager(_tmpdir) - - -def _build_container_once( - fixture_name, sagemaker_session, py_version, docker_template, sdk_tar_path, - tmp_path_factory, is_auto_capture=False, -): - """Build and push a container image exactly once across all xdist workers. - - Uses a file lock keyed by fixture_name so parallel workers wait for the - first worker to finish, then reuse the ECR URI written to a shared file. - """ - root_tmp = tmp_path_factory.getbasetemp().parent - uri_file = root_tmp / f"{fixture_name}.ecr_uri" - lock_file = root_tmp / f"{fixture_name}.lock" - - with filelock.FileLock(str(lock_file)): - if uri_file.exists(): - return uri_file.read_text().strip() - if is_auto_capture: - ecr_uri = _build_auto_capture_client_container( - py_version, docker_template, sdk_tar_path - ) - else: - ecr_uri = _build_container(sagemaker_session, py_version, docker_template, sdk_tar_path) - uri_file.write_text(ecr_uri) - return ecr_uri - - -def _build_container(sagemaker_session, py_version, docker_template, sdk_tar_path): - """Build a dummy test container locally and push to ECR.""" - region = sagemaker_session.boto_region_name - image_tag = f"{py_version.replace('.', '-')}-{sagemaker_timestamp()}" - ecr_client = sagemaker_session.boto_session.client("ecr") - username, password = _ecr_login(ecr_client) - - with _tmpdir() as tmpdir: - print("building docker image locally in ", tmpdir) - source_archive = os.path.basename(sdk_tar_path) - shutil.copy2(sdk_tar_path, os.path.join(tmpdir, source_archive)) - with open(os.path.join(tmpdir, "Dockerfile"), "w") as file: - content = docker_template.format(py_version=py_version, source_archive=source_archive) - print(f"Dockerfile contents: \n{content}\n") - file.writelines(content) - - docker_client = docker.from_env() - try: - image, build_logs = docker_client.images.build( - path=tmpdir, tag=REPO_NAME, rm=True, platform="linux/amd64" - ) - except BuildError as e: - print("docker build failed!") - for line in e.build_log: - if "stream" in line: - print(line["stream"].strip()) - raise - - if _is_repository_exists(ecr_client, REPO_NAME): - from sagemaker.core.common_utils import sts_regional_endpoint - - sts_client = sagemaker_session.boto_session.client( - "sts", - region_name=region, - endpoint_url=sts_regional_endpoint(region), - ) - account_id = sts_client.get_caller_identity()["Account"] - ecr_image = _ecr_image_uri(account_id, region, REPO_NAME, image_tag) - else: - ecr_image = _ecr_image_uri(REPO_ACCOUNT_ID, region, REPO_NAME, image_tag) - - image.tag(ecr_image, tag=image_tag) - docker_client.images.push(ecr_image, auth_config={"username": username, "password": password}) - return ecr_image - - -def _build_auto_capture_client_container(py_version, docker_template, sdk_tar_path): - """Build a test docker container for auto_capture tests.""" - with _tmpdir() as tmpdir: - source_archive = os.path.basename(sdk_tar_path) - shutil.copy2(sdk_tar_path, os.path.join(tmpdir, source_archive)) - _move_auto_capture_test_file(tmpdir) - with open(os.path.join(tmpdir, "Dockerfile"), "w") as file: - content = docker_template.format(py_version=py_version, source_archive=source_archive) - file.writelines(content) - docker_client = docker.from_env() - image, build_logs = docker_client.images.build(path=tmpdir, tag=REPO_NAME, rm=True) - return image.id - - -def _is_repository_exists(ecr_client, repo_name): - try: - ecr_client.describe_repositories(repositoryNames=[repo_name]) - return True - except ecr_client.exceptions.RepositoryNotFoundException: - return False - - -def _ecr_login(ecr_client): - """Get login credentials for an ECR client.""" - login = ecr_client.get_authorization_token() - b64token = login["authorizationData"][0]["authorizationToken"].encode("utf-8") - username, password = base64.b64decode(b64token).decode("utf-8").split(":") - return username, password - - -def _ecr_image_uri(account, region, image_name, tag): - """Build an ECR image URI.""" - return "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(account, region, image_name, tag) - - -def _generate_sagemaker_sdk_tar(destination_folder): - """Run build to generate the SDK tar file.""" - command = f"python -m build --sdist -o {destination_folder}" - try: - subprocess.run(command, shell=True, check=True, capture_output=True, text=True) - except subprocess.CalledProcessError as e: - print(f"Error when building sagemaker-core sdist: {e.stderr}") - raise - destination_folder_contents = os.listdir(destination_folder) - source_archive = [f for f in destination_folder_contents if f.endswith("tar.gz")][0] - return source_archive - - -def _generate_sdk_tar_with_public_version(destination_folder): - """Generate SDK tar with public version for auto capture tests.""" - dist_folder_path = "dist" - version_path = os.path.join(os.getcwd(), "VERSION") - if not os.path.exists(version_path): - return _generate_sagemaker_sdk_tar(destination_folder) - - with open(version_path, "r+") as version_file: - dev_sagemaker_version = version_file.readline().strip() - public_sagemaker_version = re.sub("1.dev0", "0", dev_sagemaker_version) - version_file.seek(0) - version_file.write(public_sagemaker_version) - version_file.truncate() - if os.path.exists(dist_folder_path): - shutil.rmtree(dist_folder_path) - - source_archive = _generate_sagemaker_sdk_tar(destination_folder) - - with open(version_path, "r+") as version_file: - version_file.seek(0) - version_file.write(dev_sagemaker_version) - version_file.truncate() - if os.path.exists(dist_folder_path): - shutil.rmtree(dist_folder_path) - return source_archive - - -def _move_auto_capture_test_file(destination_folder): - """Move the auto capture test file to the build folder.""" - source_path = os.path.join(os.path.dirname(__file__), "test_auto_capture.py") - destination_path = os.path.join(destination_folder, "test_auto_capture.py") - shutil.copy2(source_path, destination_path) + diff --git a/sagemaker-core/tests/integ/remote_function/test_decorator.py b/sagemaker-core/tests/integ/remote_function/test_decorator.py index 8e1a8c061c..f4e45ae222 100644 --- a/sagemaker-core/tests/integ/remote_function/test_decorator.py +++ b/sagemaker-core/tests/integ/remote_function/test_decorator.py @@ -71,6 +71,7 @@ def test_decorator(sagemaker_session, dummy_container_without_error, cpu_instanc instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=60, + job_name_prefix="test-decorator", ) def divide(x, y): return x / y @@ -87,6 +88,7 @@ def test_decorated_function_raises_exception( image_uri=dummy_container_without_error, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-decorated-fn-raises", ) def divide(x, y): logging.warning(f"{x}/{y}") @@ -104,6 +106,7 @@ def test_remote_python_runtime_is_incompatible( image_uri=dummy_container_incompatible_python_runtime, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-remote-py-incompatible", ) def divide(x, y): return x / y @@ -129,6 +132,7 @@ def test_advanced_job_setting( instance_type=cpu_instance_type, s3_kms_key=s3_kms_key, sagemaker_session=sagemaker_session, + job_name_prefix="test-advanced-job-setting", ) def divide(x, y): return x / y @@ -153,6 +157,7 @@ def test_with_custom_file_filter( include_local_workdir=True, custom_file_filter=CustomFileFilter(), keep_alive_period_in_seconds=300, + job_name_prefix="test-custom-file-filter", ) def train(x): from helpers import local_module @@ -181,6 +186,7 @@ def test_with_misconfigured_custom_file_filter( # exclude critical modules custom_file_filter=CustomFileFilter(ignore_name_patterns=["helpers"]), keep_alive_period_in_seconds=300, + job_name_prefix="test-misconfig-file-filter", ) def train(x): from helpers import local_module @@ -203,6 +209,7 @@ def test_with_additional_dependencies( dependencies=dependencies_path, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-additional-deps", ) def cuberoot(x): from scipy.special import cbrt @@ -224,6 +231,7 @@ def test_additional_dependencies_with_job_conda_env( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, job_conda_env="integ_test_env", + job_name_prefix="test-deps-job-conda-env", ) def cuberoot(x): from scipy.special import cbrt @@ -244,6 +252,7 @@ def test_additional_dependencies_with_default_conda_env( dependencies=dependencies_path, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-deps-default-conda", ) def cuberoot(x): from scipy.special import cbrt @@ -265,6 +274,7 @@ def test_additional_dependencies_with_non_existent_conda_env( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, job_conda_env="non_existent_env", + job_name_prefix="test-deps-nonexist-conda", ) def cuberoot(x): from scipy.special import cbrt @@ -286,6 +296,7 @@ def test_additional_dependencies_with_conda_yml_file( sagemaker_session=sagemaker_session, job_conda_env="integ_test_env", keep_alive_period_in_seconds=120, + job_name_prefix="test-deps-conda-yml", ) def cuberoot(x): from scipy.special import cbrt @@ -307,6 +318,7 @@ def test_with_non_existent_dependencies( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=30, + job_name_prefix="test-nonexist-deps", ) def divide(x, y): return x / y @@ -335,6 +347,7 @@ def test_decorator_pre_execution_command( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=60, + job_name_prefix="test-pre-exec-command", ) def get_file_content(file_names): joined_content = "" @@ -363,6 +376,7 @@ def test_decorator_pre_execution_script( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=60, + job_name_prefix="test-pre-exec-script", ) def get_file_content(file_names): joined_content = "" @@ -397,6 +411,7 @@ def test_decorator_pre_execution_command_error( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=60, + job_name_prefix="test-pre-exec-cmd-error", ) def get_file_content(file_names): joined_content = "" @@ -426,6 +441,7 @@ def test_decorator_pre_execution_script_error( instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, keep_alive_period_in_seconds=60, + job_name_prefix="test-pre-exec-script-err", ) def get_file_content(file_names): joined_content = "" @@ -450,6 +466,7 @@ def test_decorator_with_spot_instances( sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait_time_in_seconds=48 * 60 * 60, + job_name_prefix="test-spot-instances", ) def divide(x, y): return x / y @@ -471,6 +488,7 @@ def test_decorator_with_spot_instances_save_and_load_checkpoints( sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait_time_in_seconds=48 * 60 * 60, + job_name_prefix="test-spot-checkpoints", ) def save_checkpoints(checkpoint_path: Union[str, os.PathLike]): file_path_1 = os.path.join(checkpoint_path, "checkpoint_1.json") @@ -490,6 +508,7 @@ def save_checkpoints(checkpoint_path: Union[str, os.PathLike]): sagemaker_session=sagemaker_session, use_spot_instances=True, max_wait_time_in_seconds=48 * 60 * 60, + job_name_prefix="test-spot-checkpoints", ) def load_checkpoints(checkpoint_path: Union[str, os.PathLike]): file_path_1 = os.path.join(checkpoint_path, "checkpoint_1.json") @@ -524,6 +543,7 @@ def test_with_user_and_workdir_set_in_the_image( dependencies=dependencies_path, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-user-workdir", ) def cuberoot(x): from scipy.special import cbrt @@ -543,6 +563,7 @@ def test_with_user_and_workdir_set_in_the_image_client_error_case( image_uri=dummy_container_with_user_and_workdir, instance_type=cpu_instance_type, sagemaker_session=sagemaker_session, + job_name_prefix="test-user-workdir-err", ) def my_func(): raise RuntimeError(client_error_message) @@ -621,6 +642,7 @@ def test_decorator_torchrun( keep_alive_period_in_seconds=60, use_torchrun=use_torchrun, use_mpirun=use_mpirun, + job_name_prefix="test-torchrun", ) def divide(x, y): return x / y diff --git a/sagemaker-mlops/src/sagemaker/mlops/workflow/_steps_compiler.py b/sagemaker-mlops/src/sagemaker/mlops/workflow/_steps_compiler.py index 4e7bafcfd4..9970e2e0b3 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/workflow/_steps_compiler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/workflow/_steps_compiler.py @@ -14,9 +14,10 @@ from __future__ import absolute_import import logging -import secrets from typing import Sequence, Union, List +from cryptography.hazmat.primitives.asymmetric import ec + from sagemaker.core.helper.pipeline_variable import RequestType from sagemaker.mlops.workflow.function_step import _FunctionStep from sagemaker.mlops.workflow.steps import Step, StepTypeEnum, PropertyFile @@ -188,15 +189,13 @@ def __init__( self._all_known_steps = _StepsSet() self._build_queue = _BuildQueue() - self._function_step_secret_token = secrets.token_hex(32) + self._function_step_secret_token = ec.generate_private_key(ec.SECP256R1()) self._build_count = 0 self._steps_need_json_serialization = set() @staticmethod - def _generate_step_map( - steps: Sequence[Union[Step, StepOutput]], step_map: dict - ): + def _generate_step_map(steps: Sequence[Union[Step, StepOutput]], step_map: dict): """Helper method to create a mapping from Step/Step Collection name to itself.""" for item in steps: if isinstance(item, StepOutput): @@ -212,7 +211,6 @@ def _generate_step_map( if isinstance(step, ConditionStep): StepsCompiler._generate_step_map(step.if_steps + step.else_steps, step_map) - def _simplify_step_list( self, step_list: List[Union[str, Step, StepOutput]], @@ -399,4 +397,4 @@ def build(self): if self._build_count > 1: raise RuntimeError("Cannot build a pipeline more than once with the same compiler.") - return self._initialize_queue_and_build(self._input_steps) \ No newline at end of file + return self._initialize_queue_and_build(self._input_steps) diff --git a/sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py b/sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py index 2493af281f..df6bfc9df7 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py +++ b/sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py @@ -1146,31 +1146,41 @@ def get_function_step_result( job_arn = step_metadata["Arn"] job_name = job_arn.split("/")[-1] - if isinstance(sagemaker_session, LocalSession): - describe_training_job_response = sagemaker_session.sagemaker_client.describe_training_job( - job_name - ) - else: - describe_training_job_response = sagemaker_session.describe_training_job(job_name) + describe_training_job_response = sagemaker_session.sagemaker_client.describe_training_job( + TrainingJobName=job_name + ) container_args = describe_training_job_response["AlgorithmSpecification"]["ContainerEntrypoint"] if container_args != JOBS_CONTAINER_ENTRYPOINT: raise ValueError(_ERROR_MSG_OF_WRONG_STEP_TYPE) s3_output_path = describe_training_job_response["OutputDataConfig"]["S3OutputPath"] - s3_uri_suffix = s3_path_join(execution_id, step_name, RESULTS_FOLDER) - if s3_output_path.endswith(s3_uri_suffix) or s3_output_path[0:-1].endswith(s3_uri_suffix): - s3_uri = s3_output_path + # S3OutputPath can be in one of two formats: + # 1. New format (pipeline step): base/step_name/build_timestamp/execution_id/results + # - S3OutputPath already points directly to the results folder + # 2. Old format (legacy, pre-build-timestamp): base/execution_id/step_name/results + # - S3OutputPath also already points to the results folder + # 3. Obsoleted format: base path only, without the results suffix + # - Must append execution_id/step_name/results for backward compatibility + # + # Cases 1 and 2 both end with RESULTS_FOLDER; case 3 does not. + s3_output_path_stripped = s3_output_path.rstrip("/") + if s3_output_path_stripped.endswith("/" + RESULTS_FOLDER) or s3_output_path_stripped == RESULTS_FOLDER: + # S3OutputPath already points to the results folder (new or old format) + s3_uri = s3_output_path_stripped else: - # This is the obsoleted version of s3_output_path - # Keeping it for backward compatible - s3_uri = s3_path_join(s3_output_path, s3_uri_suffix) + # Obsoleted version of s3_output_path — append the suffix for backward compatibility + s3_uri = s3_path_join(s3_output_path, execution_id, step_name, RESULTS_FOLDER) job_status = describe_training_job_response["TrainingJobStatus"] if job_status == "Completed": + # Results are written by the job side using plain SHA-256 hashing (no asymmetric + # signature). The REMOTE_FUNCTION_SECRET_KEY is the public key used by the job to + # verify client-uploaded payloads (function/args), not for verifying job-uploaded + # results. Pass verification_key=None to use plain SHA-256 hash verification. return deserialize_obj_from_s3( sagemaker_session=sagemaker_session, s3_uri=s3_uri, - verification_key=describe_training_job_response["Environment"]["REMOTE_FUNCTION_SECRET_KEY"], + verification_key=None, ) raise RemoteFunctionError(_ERROR_MSG_OF_STEP_INCOMPLETE) diff --git a/sagemaker-mlops/tests/data/workflow/config.yaml b/sagemaker-mlops/tests/data/workflow/config.yaml new file mode 100644 index 0000000000..3e404c4a9d --- /dev/null +++ b/sagemaker-mlops/tests/data/workflow/config.yaml @@ -0,0 +1,10 @@ +SchemaVersion: '1.0' +SageMaker: + PythonSDK: + Modules: + RemoteFunction: + IncludeLocalWorkDir: true + CustomFileFilter: + IgnoreNamePatterns: + - "data" + - "test" diff --git a/sagemaker-mlops/tests/data/workflow/dummy_input.txt b/sagemaker-mlops/tests/data/workflow/dummy_input.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sagemaker-mlops/tests/data/workflow/dummy_script.py b/sagemaker-mlops/tests/data/workflow/dummy_script.py new file mode 100644 index 0000000000..db6337981e --- /dev/null +++ b/sagemaker-mlops/tests/data/workflow/dummy_script.py @@ -0,0 +1,15 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +print("This is the print output from dummy_script.py.") diff --git a/sagemaker-mlops/tests/data/workflow/pre_exec_commands b/sagemaker-mlops/tests/data/workflow/pre_exec_commands new file mode 100644 index 0000000000..6f9ace9f18 --- /dev/null +++ b/sagemaker-mlops/tests/data/workflow/pre_exec_commands @@ -0,0 +1,4 @@ +echo "test-content-1" > test_file_1 +echo "test-content-2" > test_file_2 +echo "test-content-3" > test_file_3 +rm ./test_file_2 \ No newline at end of file diff --git a/sagemaker-mlops/tests/data/workflow/requirements.txt b/sagemaker-mlops/tests/data/workflow/requirements.txt new file mode 100644 index 0000000000..f89caf8c2b --- /dev/null +++ b/sagemaker-mlops/tests/data/workflow/requirements.txt @@ -0,0 +1 @@ +scipy==1.13.0 diff --git a/sagemaker-mlops/tests/integ/conftest.py b/sagemaker-mlops/tests/integ/conftest.py index ca8f21119b..e15dc67827 100644 --- a/sagemaker-mlops/tests/integ/conftest.py +++ b/sagemaker-mlops/tests/integ/conftest.py @@ -1,15 +1,239 @@ -"""Shared pytest fixtures for integration tests.""" -import pytest +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Shared pytest fixtures for sagemaker-mlops integration tests.""" +from __future__ import absolute_import + +import json import os +import os as _os +import sys + +import boto3 +import pytest +from botocore.config import Config + +from sagemaker.core.helper.session_helper import Session, get_execution_role +from sagemaker.core import image_uris +from sagemaker.core.workflow.pipeline_context import PipelineSession + +# Shared container-build helpers (file-locked, xdist-safe). +# Loaded via importlib to avoid collision with the local `tests` package. +import importlib.util as _importlib_util + +_container_build_path = _os.path.abspath( + _os.path.join(_os.path.dirname(__file__), "..", "..", "..", "tests", "integ_helpers", "container_build.py") +) +_spec = _importlib_util.spec_from_file_location("integ_helpers.container_build", _container_build_path) +_container_build = _importlib_util.module_from_spec(_spec) +_spec.loader.exec_module(_container_build) + +DOCKERFILE_TEMPLATE = _container_build.DOCKERFILE_TEMPLATE +DOCKERFILE_TEMPLATE_WITH_CONDA = _container_build.DOCKERFILE_TEMPLATE_WITH_CONDA +DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR = _container_build.DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR +build_sdk_tar_once = _container_build.build_sdk_tar_once +build_container_once = _container_build.build_container_once + +DEFAULT_REGION = "us-east-1" +CUSTOM_S3_OBJECT_KEY_PREFIX = "session-default-prefix" + +CONDA_YML_FILE_TEMPLATE = ( + "name: integ_test_env\n" + "channels:\n" + " - defaults\n" + "dependencies:\n" + " - requests=2.32.3\n" + " - charset-normalizer=3.3.2\n" + " - scipy=1.13.1\n" + "prefix: /opt/conda/bin/conda\n" +) + + +# --------------------------------------------------------------------------- +# CLI options +# --------------------------------------------------------------------------- + +def pytest_addoption(parser): + parser.addoption("--sagemaker-client-config", action="store", default=None) + parser.addoption("--boto-config", action="store", default=None) + + +def pytest_configure(config): + bc = config.getoption("--boto-config") + parsed = json.loads(bc) if bc else {} + region = parsed.get("region_name", boto3.session.Session().region_name) + if region: + os.environ["TEST_AWS_REGION_NAME"] = region + + +# --------------------------------------------------------------------------- +# Core session fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def sagemaker_client_config(request): + config = request.config.getoption("--sagemaker-client-config") + return json.loads(config) if config else dict() + + +@pytest.fixture(scope="session") +def boto_session(request): + config = request.config.getoption("--boto-config") + if config: + return boto3.Session(**json.loads(config)) + else: + return boto3.Session(region_name=DEFAULT_REGION) + + +@pytest.fixture(scope="session") +def sagemaker_session(sagemaker_client_config, boto_session): + sagemaker_client_config.setdefault("config", Config(retries=dict(max_attempts=10))) + sagemaker_client = ( + boto_session.client("sagemaker", **sagemaker_client_config) + if sagemaker_client_config + else None + ) + return Session( + boto_session=boto_session, + sagemaker_client=sagemaker_client, + sagemaker_config={}, + default_bucket_prefix=CUSTOM_S3_OBJECT_KEY_PREFIX, + ) + + +@pytest.fixture(scope="session") +def pipeline_session(boto_session): + return PipelineSession(boto_session=boto_session) + + +# --------------------------------------------------------------------------- +# Workflow-scoped session (isolated to prevent race conditions with other tests) +# --------------------------------------------------------------------------- +@pytest.fixture(scope="module") +def sagemaker_session_for_pipeline(sagemaker_client_config, boto_session): + """Separate SageMaker session scoped to the module to avoid settings race conditions.""" + sagemaker_client_config.setdefault("config", Config(retries=dict(max_attempts=10))) + sagemaker_client = ( + boto_session.client("sagemaker", **sagemaker_client_config) + if sagemaker_client_config + else None + ) + return Session( + boto_session=boto_session, + sagemaker_client=sagemaker_client, + sagemaker_config={}, + default_bucket_prefix=CUSTOM_S3_OBJECT_KEY_PREFIX, + ) + + +@pytest.fixture(scope="module") +def role(sagemaker_session_for_pipeline): + return get_execution_role(sagemaker_session_for_pipeline) + + +@pytest.fixture(scope="module") +def region_name(sagemaker_session_for_pipeline): + return sagemaker_session_for_pipeline.boto_session.region_name + + +# --------------------------------------------------------------------------- +# Path fixtures +# --------------------------------------------------------------------------- @pytest.fixture(scope="session") def test_data_dir(): - """Return the path to the test data directory.""" return os.path.join(os.path.dirname(__file__), "data") @pytest.fixture(scope="session") def test_code_dir(): - """Return the path to the test code directory.""" return os.path.join(os.path.dirname(__file__), "code") + + +# --------------------------------------------------------------------------- +# Framework version fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="module") +def sklearn_latest_version(): + """Return the latest SKLearn framework version available.""" + from packaging.version import Version + config = image_uris.config_for_framework("sklearn") + if "versions" not in config: + config = next(iter(config.values())) + return sorted(config["versions"].keys(), key=lambda v: Version(v))[-1] + + +# --------------------------------------------------------------------------- +# Python version fixture +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def compatible_python_version(): + return "{}.{}".format(sys.version_info.major, sys.version_info.minor) + + +# --------------------------------------------------------------------------- +# SDK tar — built once, shared across all xdist workers via file lock +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def sagemaker_sdk_tar_path(tmp_path_factory): + """Build the sagemaker-mlops sdist exactly once across all xdist workers.""" + return build_sdk_tar_once(tmp_path_factory) + + +# --------------------------------------------------------------------------- +# Container fixtures — each image built & pushed once, ECR URI cached on disk +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def dummy_container_without_error(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_without_error", + sagemaker_session, compatible_python_version, + DOCKERFILE_TEMPLATE, sagemaker_sdk_tar_path, tmp_path_factory, + ) + + +@pytest.fixture(scope="session") +def dummy_container_with_user_and_workdir(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_with_user_and_workdir", + sagemaker_session, compatible_python_version, + DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR, sagemaker_sdk_tar_path, tmp_path_factory, + ) + + +@pytest.fixture(scope="session") +def dummy_container_with_conda(sagemaker_session, compatible_python_version, + sagemaker_sdk_tar_path, tmp_path_factory): + return build_container_once( + "dummy_container_with_conda", + sagemaker_session, compatible_python_version, + DOCKERFILE_TEMPLATE_WITH_CONDA, sagemaker_sdk_tar_path, tmp_path_factory, + ) + + +@pytest.fixture(scope="session") +def conda_env_yml(): + """Write a temporary conda yml file and yield its path; clean up afterwards.""" + conda_yml_file_name = "conda_env.yml" + conda_file_path = os.path.join(os.getcwd(), conda_yml_file_name) + with open(conda_file_path, "w") as yml_file: + yml_file.writelines(CONDA_YML_FILE_TEMPLATE) + yield conda_file_path + if os.path.isfile(conda_yml_file_name): + os.remove(conda_yml_file_name) diff --git a/sagemaker-mlops/tests/integ/workflow/helpers.py b/sagemaker-mlops/tests/integ/workflow/helpers.py new file mode 100644 index 0000000000..83478f4fff --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/helpers.py @@ -0,0 +1,120 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import re +from botocore.exceptions import WaiterError + +from sagemaker.core.helper.session_helper import Session +from sagemaker.mlops.workflow.pipeline import Pipeline, PipelineExecution + + +def wait_pipeline_execution(execution: PipelineExecution, delay: int = 30, max_attempts: int = 60): + try: + execution.wait(delay=delay, max_attempts=max_attempts) + except WaiterError: + pass + + +def create_and_execute_pipeline( + pipeline: Pipeline, + pipeline_name, + region_name, + role, + no_of_steps, + last_step_name_prefix, + execution_parameters, + step_status, + step_result_type=None, + step_result_value=None, + wait_duration=400, # seconds + selective_execution_config=None, +): + create_arn = None + if not selective_execution_config: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + execution = pipeline.start( + parameters=execution_parameters, selective_execution_config=selective_execution_config + ) + + if create_arn: + response = execution.describe() + assert response["PipelineArn"] == create_arn + + wait_pipeline_execution(execution=execution, delay=20, max_attempts=int(wait_duration / 20)) + + execution_steps = execution.list_steps() + + assert ( + len(execution_steps) == no_of_steps + ), f"Expected {no_of_steps}, instead found {len(execution_steps)}" + + assert last_step_name_prefix in execution_steps[0]["StepName"] + actual_status = execution_steps[0]["StepStatus"] + failure_reason = execution_steps[0].get("FailureReason", "") + assert actual_status == step_status, ( + f"Expected step status '{step_status}' but got '{actual_status}'. " + f"Step: {execution_steps[0]['StepName']}. " + f"FailureReason: {failure_reason or 'none'}" + ) + if step_result_type: + result = execution.result(execution_steps[0]["StepName"]) + assert isinstance( + result, step_result_type + ), f"Expected {step_result_type}, instead found {type(result)}" + + if step_result_value: + result = execution.result(execution_steps[0]["StepName"]) + assert result == step_result_value, f"Expected {step_result_value}, instead found {result}" + + if selective_execution_config: + for exe_step in execution_steps: + if exe_step["StepName"] in selective_execution_config.selected_steps: + continue + assert ( + exe_step["SelectiveExecutionResult"]["SourcePipelineExecutionArn"] + == selective_execution_config.source_pipeline_execution_arn + ) + + return execution, execution_steps + + +def validate_scheduled_pipeline_execution( + execution_arn: str, + pipeline_arn: str, + no_of_steps: int, + last_step_name: str, + status: str, + session: Session, +): + _pipeline_execution = PipelineExecution( + arn=execution_arn, + sagemaker_session=session, + ) + response = _pipeline_execution.describe() + assert response["PipelineArn"] == pipeline_arn + + wait_pipeline_execution(execution=_pipeline_execution, delay=20, max_attempts=20) + + execution_steps = _pipeline_execution.list_steps() + + assert len(execution_steps) == no_of_steps + + assert last_step_name in execution_steps[0]["StepName"] + assert execution_steps[0]["StepStatus"] == status diff --git a/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py b/sagemaker-mlops/tests/integ/workflow/test_pipeline_train_registry.py similarity index 97% rename from sagemaker-mlops/tests/integ/test_pipeline_train_registry.py rename to sagemaker-mlops/tests/integ/workflow/test_pipeline_train_registry.py index 90c1eb3aaf..a7e92df694 100644 --- a/sagemaker-mlops/tests/integ/test_pipeline_train_registry.py +++ b/sagemaker-mlops/tests/integ/workflow/test_pipeline_train_registry.py @@ -37,7 +37,7 @@ def role(): return get_execution_role() -def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, role): +def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, role, sklearn_latest_version): region = sagemaker_session.boto_region_name bucket = sagemaker_session.default_bucket() prefix = "integ-test-v3-pipeline" @@ -45,7 +45,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r # Upload abalone data to S3 s3_client = boto3.client("s3") - abalone_path = os.path.join(os.path.dirname(__file__), "data", "pipeline", "abalone.csv") + abalone_path = os.path.join(os.path.dirname(__file__), "..", "data", "pipeline", "abalone.csv") s3_client.upload_file(abalone_path, bucket, f"{prefix}/input/abalone.csv") input_data_s3 = f"s3://{bucket}/{prefix}/input/abalone.csv" @@ -68,7 +68,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r image_uri=image_uris.retrieve( framework="sklearn", region=region, - version="1.2-1", + version=sklearn_latest_version, py_version="py3", instance_type="ml.m5.xlarge", ), @@ -118,7 +118,7 @@ def test_pipeline_with_train_and_registry(sagemaker_session, pipeline_session, r ), ), ], - code=os.path.join(os.path.dirname(__file__), "code", "pipeline", "preprocess.py"), + code=os.path.join(os.path.dirname(__file__), "..", "code", "pipeline", "preprocess.py"), arguments=["--input-data", input_data], ) diff --git a/sagemaker-mlops/tests/integ/workflow/test_step_decorator.py b/sagemaker-mlops/tests/integ/workflow/test_step_decorator.py new file mode 100644 index 0000000000..a27d236575 --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/test_step_decorator.py @@ -0,0 +1,995 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import json + +import numpy +import pytest +import os +import random + +from sagemaker.core.common_utils import unique_name_from_base +from sagemaker.core.config.config import load_sagemaker_config +from sagemaker.core.remote_function.errors import RemoteFunctionError +from sagemaker.core.remote_function.core.serialization import CloudpickleSerializer +from sagemaker.core.s3.client import S3Uploader +from sagemaker.core.helper.session_helper import s3_path_join +from sagemaker.core.workflow.conditions import ConditionLessThan, ConditionEquals +from sagemaker.core.workflow.parameters import ( + ParameterString, + ParameterInteger, + ParameterFloat, + ParameterBoolean, +) +from sagemaker.core.workflow.step_outputs import get_step +from sagemaker.core import image_uris +from sagemaker.core.processing import ScriptProcessor +from sagemaker.core.shapes.shapes import ProcessingInput, ProcessingS3Input +from sagemaker.mlops.workflow.fail_step import FailStep +from sagemaker.mlops.workflow.function_step import step +from sagemaker.mlops.workflow.condition_step import ConditionStep +from sagemaker.mlops.workflow.pipeline import Pipeline +from sagemaker.mlops.workflow.steps import ProcessingStep + +from tests.integ.workflow.helpers import ( + create_and_execute_pipeline, + wait_pipeline_execution, +) +from tests.integ import DATA_DIR + +INSTANCE_TYPE = "ml.m5.large" + + +@pytest.fixture +def pipeline_name(): + return unique_name_from_base("Decorated-Step-Pipeline") + + +def test_compile_pipeline_with_function_steps( + sagemaker_session_for_pipeline, role, pipeline_name, region_name +): + @step( + name="generate", + role=role, + instance_type=INSTANCE_TYPE, + ) + def generate(): + """adds two numbers""" + return random.randint(0, 100) + + @step( + name="print", + role=role, + instance_type=INSTANCE_TYPE, + ) + def print_result(result): + print(result) + + generated = generate() + conditional_print = ConditionStep( + name="condition-step", + # TODO: replace with the generated result + conditions=[ConditionEquals(left=1, right=1)], + if_steps=[print_result(generated)], + ) + + pipeline = Pipeline( + name=pipeline_name, + sagemaker_session=sagemaker_session_for_pipeline, + steps=[generated, conditional_print], + ) + + try: + pipeline.create(role_arn=role) + + # verify the artifacts are uploaded to the location specified by sagemaker_session + assert ( + len( + sagemaker_session_for_pipeline.list_s3_files( + sagemaker_session_for_pipeline.default_bucket(), + f"{sagemaker_session_for_pipeline.default_bucket_prefix}/{pipeline_name}/generate", + ) + ) + > 0 + ) + + assert ( + len( + sagemaker_session_for_pipeline.list_s3_files( + sagemaker_session_for_pipeline.default_bucket(), + f"{sagemaker_session_for_pipeline.default_bucket_prefix}/{pipeline_name}/print", + ) + ) + > 0 + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_no_dependencies( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=300, + ) + def sum(a, b): + """adds two numbers""" + return a + b + + step_output_a = sum(2, 3) + step_output_b = sum(5, 6) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_output_a, step_output_b], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name_prefix="sum", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_with_execution_dependencies( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=300, + ) + def sum(a, b): + """adds two numbers""" + return a + b + + step_output_a = sum(2, 3) + step_output_b = sum(5, 6) + get_step(step_output_b).add_depends_on([step_output_a]) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_output_b], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name_prefix="sum", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + wait_duration=800, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_with_data_dependencies( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + step_settings = dict( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=300, + ) + + @step(**step_settings) + def generator() -> tuple: + return 3, 4 + + @step(**step_settings) + def sum(a, b): + """adds two numbers""" + return a + b + + step_output_a = generator() + step_output_b = sum(step_output_a[0], step_output_a[1]) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_output_b], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name_prefix="sum", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + step_result_value=7, + wait_duration=800, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_with_pipeline_parameters( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + instance_type = ParameterString(name="TrainingInstanceCount", default_value=INSTANCE_TYPE) + + @step( + role=role, + instance_type=instance_type, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=300, + ) + def sum(a, b): + """adds two numbers""" + return a + b + + step_a = sum(2, 3) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_type], + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="sum", + execution_parameters=dict(TrainingInstanceCount="ml.m5.xlarge"), + step_status="Succeeded", + step_result_type=int, + step_result_value=5, + ) + + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_passing_different_pipeline_variables_to_function( + sagemaker_session_for_pipeline, + pipeline_session, + role, + pipeline_name, + region_name, + dummy_container_without_error, + sklearn_latest_version, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + param_a = ParameterInteger(name="param_a", default_value=2) + param_b = ParameterBoolean(name="param_b", default_value=True) + param_c = ParameterFloat(name="param_c", default_value=2.0) + param_d = ParameterString(name="param_d", default_value="string") + + script_path = os.path.join(DATA_DIR, "workflow", "dummy_script.py") + + sklearn_processor = ScriptProcessor( + image_uri=image_uris.retrieve( + framework="sklearn", + region=region_name, + version=sklearn_latest_version, + py_version="py3", + instance_type=INSTANCE_TYPE, + ), + instance_type=INSTANCE_TYPE, + instance_count=1, + base_job_name="test-sklearn", + sagemaker_session=pipeline_session, + role=role, + ) + + step_args = sklearn_processor.run(code=script_path) + step_sklearn = ProcessingStep( + name="sklearn-process", + step_args=step_args, + ) + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=600, + ) + def func_1(): + return 1, 2, {"key": 3} + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=300, + ) + def func_2(*args): + return args + + first_output = func_1() + final_output = func_2( + param_a, + param_b, + param_c, + param_d, + step_sklearn.properties.ProcessingJobStatus, + first_output[2]["key"], + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[param_a, param_b, param_c, param_d], + steps=[final_output], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=3, + last_step_name_prefix="func", + execution_parameters=dict(param_a=3), + step_status="Succeeded", + step_result_type=tuple, + step_result_value=(3, True, 2.0, "string", "Completed", 3), + wait_duration=600, + ) + + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_with_pre_execution_script( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + pre_execution_script_path = os.path.join(DATA_DIR, "workflow", "pre_exec_commands") + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + pre_execution_script=pre_execution_script_path, + keep_alive_period_in_seconds=300, + ) + def validate_file_exists(files_exists, files_does_not_exist): + for file_name in files_exists: + if not os.path.exists(file_name): + raise ValueError(f"file {file_name} should exist") + + for file_name in files_does_not_exist: + if os.path.exists(file_name): + raise ValueError(f"file {file_name} should not exist") + + step_a = validate_file_exists(["test_file_1", "test_file_3"], ["test_file_2"]) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="validate_file_exists", + execution_parameters=dict(), + step_status="Succeeded", + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_decorator_with_include_local_workdir( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + monkeypatch, + dummy_container_without_error, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + source_dir_path = os.path.join(os.path.dirname(__file__)) + original_sagemaker_config = sagemaker_session_for_pipeline.sagemaker_config + with monkeypatch.context() as m: + m.chdir(source_dir_path) + sagemaker_config = load_sagemaker_config( + [os.path.join(DATA_DIR, "workflow", "config.yaml")] + ) + sagemaker_session_for_pipeline.sagemaker_config = sagemaker_config + dependencies_path = os.path.join(DATA_DIR, "workflow", "requirements.txt") + + @step( + role=role, + instance_type=INSTANCE_TYPE, + dependencies=dependencies_path, + keep_alive_period_in_seconds=300, + image_uri=dummy_container_without_error, + ) + def train(x): + from workdir_helpers import local_module + from workdir_helpers.nested_helper import local_module2 + + output = local_module.square(x) + local_module2.cube(x) + print(output) + return output + + step_result = train(2) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_result], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="train", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + step_result_value=12, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + sagemaker_session_for_pipeline.sagemaker_config = original_sagemaker_config + + +def test_decorator_with_conda_env( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_conda, + conda_env_yml, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + @step( + role=role, + image_uri=dummy_container_with_conda, + dependencies=conda_env_yml, + instance_type=INSTANCE_TYPE, + job_conda_env="integ_test_env", + ) + def cuberoot(x): + from scipy.special import cbrt + + return cbrt(x) + + step_a = cuberoot(8) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="cuberoot", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=numpy.float64, + step_result_value=2.0, + wait_duration=900, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_decorator_step_failed( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_without_error, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + @step( + role=role, + image_uri=dummy_container_without_error, + instance_type=INSTANCE_TYPE, + keep_alive_period_in_seconds=300, + ) + def divide(x, y): + return x / y + + step_a = divide(10, 0) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + execution, execution_steps = create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="divide", + execution_parameters=dict(), + step_status="Failed", + ) + + step_name = execution_steps[0]["StepName"] + with pytest.raises(RemoteFunctionError) as e: + execution.result(step_name) + assert f"step {step_name} is not in Completed status." in str(e) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_decorator_step_with_json_get( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_without_error, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + step_settings = dict( + role=role, + image_uri=dummy_container_without_error, + instance_type=INSTANCE_TYPE, + keep_alive_period_in_seconds=300, + ) + + @step(name="step1", **step_settings) + def func1() -> tuple: + return 0, 1 + + @step(name="step2", **step_settings) + def func2(): + return 2 + + @step(name="step3", **step_settings) + def func3(): + return 3 + + step_output1 = func1() + step_output2 = func2() + step_output3 = func3() + + cond_lt = ConditionLessThan(left=step_output1[1], right=step_output2) + + fail_step = FailStep( + name="MyFailStep", + error_message="Failed due to hitting in else branch", + ) + + cond_step = ConditionStep( + name="MyConditionStep", + conditions=[cond_lt], + if_steps=[], + else_steps=[fail_step], + depends_on=[step_output3], + ) + + pipeline = Pipeline( + name=pipeline_name, + steps=[cond_step], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=4, # The FailStep in else branch is not executed + last_step_name_prefix="MyConditionStep", + execution_parameters=dict(), + step_status="Succeeded", + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_decorator_step_data_referenced_by_other_steps( + pipeline_session, + role, + pipeline_name, + region_name, + dummy_container_without_error, + sklearn_latest_version, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + processing_job_instance_counts = 2 + + @step( + name="step1", + role=role, + image_uri=dummy_container_without_error, + instance_type=INSTANCE_TYPE, + keep_alive_period_in_seconds=300, + ) + def func(var: int): + return 1, var + + step_output = func(processing_job_instance_counts) + + script_path = os.path.join(DATA_DIR, "workflow", "dummy_script.py") + input_file_path = os.path.join(DATA_DIR, "workflow", "dummy_input.txt") + inputs = [ + ProcessingInput( + input_name="input-1", + s3_input=ProcessingS3Input( + s3_uri=input_file_path, + local_path="/opt/ml/processing/inputs/", + s3_data_type="S3Prefix", + ), + ), + ] + + sklearn_processor = ScriptProcessor( + image_uri=image_uris.retrieve( + framework="sklearn", + region=region_name, + version=sklearn_latest_version, + py_version="py3", + instance_type=INSTANCE_TYPE, + ), + instance_type=INSTANCE_TYPE, + instance_count=step_output[1], + base_job_name="test-sklearn", + sagemaker_session=pipeline_session, + role=role, + ) + + step_args = sklearn_processor.run( + inputs=inputs, + code=script_path, + ) + process_step = ProcessingStep( + name="MyProcessStep", + step_args=step_args, + ) + + pipeline = Pipeline( + name=pipeline_name, + steps=[process_step], + sagemaker_session=pipeline_session, + ) + + try: + _, execution_steps = create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name_prefix=process_step.name, + execution_parameters=dict(), + step_status="Succeeded", + wait_duration=1000, # seconds + ) + + execution_proc_job = pipeline_session.sagemaker_client.describe_processing_job( + ProcessingJobName=execution_steps[0]["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1] + ) + assert ( + execution_proc_job["ProcessingResources"]["ClusterConfig"]["InstanceCount"] + == processing_job_instance_counts + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_decorator_step_checksum_mismatch( + sagemaker_session_for_pipeline, dummy_container_without_error, pipeline_name, role +): + step_name = "original_func_step" + + @step( + name=step_name, + role=role, + image_uri=dummy_container_without_error, + instance_type=INSTANCE_TYPE, + keep_alive_period_in_seconds=300, + ) + def original_func(x): + return x * x + + def updated_func(x): + return x + 25 + + pickled_updated_func = CloudpickleSerializer.serialize(updated_func) + + step_a = original_func(10) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + pipeline.create(role) + + pipeline_definition = json.loads(pipeline.describe()["PipelineDefinition"]) + step_container_args = pipeline_definition["Steps"][0]["Arguments"][ + "AlgorithmSpecification" + ]["ContainerArguments"] + s3_base_uri = step_container_args[step_container_args.index("--s3_base_uri") + 1] + build_time = step_container_args[step_container_args.index("--func_step_s3_dir") + 1] + + # some other user updates the pickled function code + S3Uploader.upload_bytes( + pickled_updated_func, + s3_path_join(s3_base_uri, step_name, build_time, "function", "payload.pkl"), + kms_key=None, + sagemaker_session=sagemaker_session_for_pipeline, + ) + execution = pipeline.start() + wait_pipeline_execution(execution=execution, delay=20, max_attempts=20) + execution_steps = execution.list_steps() + + assert execution_steps[0]["StepStatus"] == "Failed" + assert ( + "Integrity check for the serialized function or data failed" + in execution_steps[0]["FailureReason"] + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_with_user_and_workdir_set_in_the_image( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_user_and_workdir, +): + os.environ["AWS_DEFAULT_REGION"] = region_name + dependencies_path = os.path.join(DATA_DIR, "workflow", "requirements.txt") + + @step( + role=role, + image_uri=dummy_container_with_user_and_workdir, + dependencies=dependencies_path, + instance_type=INSTANCE_TYPE, + ) + def cuberoot(x): + from scipy.special import cbrt + + return cbrt(x) + + step_a = cuberoot(8) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix="cuberoot", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=numpy.float64, + step_result_value=2.0, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_with_user_and_workdir_set_in_the_image_client_error_case( + sagemaker_session_for_pipeline, + role, + pipeline_name, + region_name, + dummy_container_with_user_and_workdir, +): + # This test aims to ensure client error in step decorated function + # can be successfully surfaced and the job can be failed. + os.environ["AWS_DEFAULT_REGION"] = region_name + client_error_message = "Testing client error in job." + + @step( + role=role, + image_uri=dummy_container_with_user_and_workdir, + instance_type=INSTANCE_TYPE, + ) + def my_func(): + raise RuntimeError(client_error_message) + + step_a = my_func() + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_a], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + _, execution_steps = create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=1, + last_step_name_prefix=get_step(step_a).name, + execution_parameters=dict(), + step_status="Failed", + ) + assert ( + f"ClientError: AlgorithmError: RuntimeError('{client_error_message}')" + in execution_steps[0]["FailureReason"] + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_step_level_serialization( + sagemaker_session_for_pipeline, role, pipeline_name, region_name, dummy_container_without_error +): + os.environ["AWS_DEFAULT_REGION"] = region_name + + _EXPECTED_STEP_A_OUTPUT = "This pipeline is a function." + _EXPECTED_STEP_B_OUTPUT = "This generates a function arg." + + step_config = dict( + role=role, + image_uri=dummy_container_without_error, + instance_type=INSTANCE_TYPE, + ) + + # This pipeline function may clash with the pipeline object + # defined below. + # However, if the function and args serialization happen in + # step level, this clash won't happen. + def pipeline(): + return _EXPECTED_STEP_A_OUTPUT + + @step(**step_config) + def generator(): + return _EXPECTED_STEP_B_OUTPUT + + @step(**step_config) + def func_with_collision(var: str): + return f"{pipeline()} {var}" + + step_output_a = generator() + step_output_b = func_with_collision(step_output_a) + + pipeline = Pipeline( # noqa: F811 + name=pipeline_name, + steps=[step_output_b], + sagemaker_session=sagemaker_session_for_pipeline, + ) + + try: + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name_prefix=get_step(step_output_b).name, + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=str, + step_result_value=f"{_EXPECTED_STEP_A_OUTPUT} {_EXPECTED_STEP_B_OUTPUT}", + wait_duration=800, + ) + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/sagemaker-mlops/tests/integ/workflow/workdir_helpers/__init__.py b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/__init__.py new file mode 100644 index 0000000000..6549052177 --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/__init__.py @@ -0,0 +1,12 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/sagemaker-mlops/tests/integ/workflow/workdir_helpers/local_module.py b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/local_module.py new file mode 100644 index 0000000000..ac81ef9f5d --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/local_module.py @@ -0,0 +1,17 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + + +def square(x): + """Returns x squared.""" + return x * x diff --git a/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/__init__.py b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/__init__.py new file mode 100644 index 0000000000..6549052177 --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/__init__.py @@ -0,0 +1,12 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/local_module2.py b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/local_module2.py new file mode 100644 index 0000000000..f898d47018 --- /dev/null +++ b/sagemaker-mlops/tests/integ/workflow/workdir_helpers/nested_helper/local_module2.py @@ -0,0 +1,17 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + + +def cube(x): + """Returns x cubed.""" + return x * x * x diff --git a/sagemaker-mlops/tests/unit/workflow/test_pipeline.py b/sagemaker-mlops/tests/unit/workflow/test_pipeline.py index 754d8b2773..e93f649f00 100644 --- a/sagemaker-mlops/tests/unit/workflow/test_pipeline.py +++ b/sagemaker-mlops/tests/unit/workflow/test_pipeline.py @@ -342,7 +342,7 @@ def test_get_function_step_result_wrong_container(mock_session): from sagemaker.mlops.workflow.pipeline import get_function_step_result step_list = [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}] - mock_session.describe_training_job.return_value = { + mock_session.sagemaker_client.describe_training_job.return_value = { "AlgorithmSpecification": {"ContainerEntrypoint": ["python"]}, "OutputDataConfig": {"S3OutputPath": "s3://bucket/path"} } @@ -357,7 +357,7 @@ def test_get_function_step_result_incomplete_job(mock_session): from sagemaker.core.remote_function.errors import RemoteFunctionError step_list = [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}] - mock_session.describe_training_job.return_value = { + mock_session.sagemaker_client.describe_training_job.return_value = { "AlgorithmSpecification": {"ContainerEntrypoint": JOBS_CONTAINER_ENTRYPOINT}, "OutputDataConfig": {"S3OutputPath": "s3://bucket/path"}, "TrainingJobStatus": "Failed", @@ -373,7 +373,7 @@ def test_get_function_step_result_success(mock_session): from sagemaker.core.remote_function.job import JOBS_CONTAINER_ENTRYPOINT step_list = [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}] - mock_session.describe_training_job.return_value = { + mock_session.sagemaker_client.describe_training_job.return_value = { "AlgorithmSpecification": {"ContainerEntrypoint": JOBS_CONTAINER_ENTRYPOINT}, "OutputDataConfig": {"S3OutputPath": "s3://bucket/path/exec-id/step1/results"}, "TrainingJobStatus": "Completed", @@ -496,7 +496,7 @@ def test_pipeline_execution_result_terminal_failure(mock_session): mock_session.sagemaker_client.list_pipeline_execution_steps.return_value = { "PipelineExecutionSteps": [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}] } - mock_session.describe_training_job.return_value = { + mock_session.sagemaker_client.describe_training_job.return_value = { "AlgorithmSpecification": {"ContainerEntrypoint": JOBS_CONTAINER_ENTRYPOINT}, "OutputDataConfig": {"S3OutputPath": "s3://bucket/path/exec-id/step1/results"}, "TrainingJobStatus": "Completed", @@ -514,7 +514,7 @@ def test_get_function_step_result_obsolete_s3_path(mock_session): from sagemaker.core.remote_function.job import JOBS_CONTAINER_ENTRYPOINT step_list = [{"StepName": "step1", "Metadata": {"TrainingJob": {"Arn": "arn:aws:sagemaker:us-west-2:123456789012:training-job/job"}}}] - mock_session.describe_training_job.return_value = { + mock_session.sagemaker_client.describe_training_job.return_value = { "AlgorithmSpecification": {"ContainerEntrypoint": JOBS_CONTAINER_ENTRYPOINT}, "OutputDataConfig": {"S3OutputPath": "s3://bucket/different/path"}, "TrainingJobStatus": "Completed", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000..6549052177 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,12 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/tests/integ_helpers/__init__.py b/tests/integ_helpers/__init__.py new file mode 100644 index 0000000000..6549052177 --- /dev/null +++ b/tests/integ_helpers/__init__.py @@ -0,0 +1,12 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. diff --git a/tests/integ_helpers/container_build.py b/tests/integ_helpers/container_build.py new file mode 100644 index 0000000000..6cfc1ccded --- /dev/null +++ b/tests/integ_helpers/container_build.py @@ -0,0 +1,294 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Shared helpers for building and pushing ECR containers in integration tests. + +Both sagemaker-core and sagemaker-mlops integration test suites need to build +the same dummy Docker images and push them to ECR. This module centralises +that logic so the two conftest.py files can share it without duplication. + +Parallel-safety +--------------- +When tests are run with pytest-xdist (multiple workers), several workers may +reach the same fixture concurrently. We use two layers of file locking +(provided by the ``filelock`` package) to ensure: + +1. The SDK sdist is built exactly once and the resulting tar.gz is reused by + all workers (``build_sdk_tar_once``). +2. Each Docker image is built and pushed exactly once; subsequent workers read + the ECR URI from a small text file written by the first worker + (``build_container_once``). + +Both helpers rely on ``tmp_path_factory.getbasetemp().parent``, which +pytest-xdist guarantees is the *same* directory for every worker in a session. +""" +from __future__ import absolute_import + +import base64 +import os +import shutil +import subprocess +import tempfile +from contextlib import contextmanager + +import docker +import filelock +from docker.errors import BuildError + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +REPO_ACCOUNT_ID = "033110030271" +REPO_NAME = "remote-function-dummy-container" + +# --------------------------------------------------------------------------- +# Dockerfile templates (identical to those used in both conftest files) +# --------------------------------------------------------------------------- + +DOCKERFILE_TEMPLATE = ( + "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" + "RUN apt-get update -y \ + && apt-get install -y unzip curl\n\n" + "RUN curl 'https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip'" + " -o 'awscliv2.zip' \ + && unzip awscliv2.zip \ + && ./aws/install\n\n" + "COPY {source_archive} ./\n" + "RUN pip3 install --no-cache-dir '{source_archive}'\n" + "RUN rm {source_archive}\n" +) + +DOCKERFILE_TEMPLATE_WITH_CONDA = ( + "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" + 'SHELL ["/bin/bash", "-c"]\n' + "RUN apt-get update -y \ + && apt-get install -y unzip curl\n\n" + "RUN curl -L -O " + "'https://github.com/conda-forge/miniforge/releases/download/24.11.3-2/" + "Miniforge3-Linux-x86_64.sh' \ + && bash Miniforge3-Linux-x86_64.sh -b -p '/opt/conda' \ + && /opt/conda/bin/conda init bash\n\n" + "ENV PATH $PATH:/opt/conda/bin\n" + "RUN mamba create -n integ_test_env python={py_version} -y \ + && mamba create -n default_env python={py_version} -y\n" + "COPY {source_archive} ./\n" + "RUN pip install --no-cache-dir '{source_archive}' \ + && mamba run -n base pip install --no-cache-dir '{source_archive}' \ + && mamba run -n default_env pip install --no-cache-dir '{source_archive}' \ + && mamba run -n integ_test_env pip install --no-cache-dir '{source_archive}'\n" + "ENV SHELL=/bin/bash\n" + "ENV SAGEMAKER_JOB_CONDA_ENV=default_env\n" +) + +DOCKERFILE_TEMPLATE_WITH_USER_AND_WORKDIR = ( + "FROM public.ecr.aws/docker/library/python:{py_version}-slim\n\n" + "RUN apt-get update -y \ + && apt-get install -y unzip curl\n\n" + "RUN curl 'https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip'" + " -o 'awscliv2.zip' \ + && unzip awscliv2.zip \ + && ./aws/install\n\n" + "RUN apt install -y sudo\n" + "RUN useradd -ms /bin/bash integ-test-user\n" + "RUN usermod -aG sudo integ-test-user\n" + "RUN echo '%sudo ALL= (ALL) NOPASSWD:ALL' >> /etc/sudoers\n" + "USER integ-test-user\n" + "WORKDIR /home/integ-test-user\n" + "COPY {source_archive} ./\n" + "RUN pip install --no-cache-dir '{source_archive}'\n" + "RUN rm {source_archive}\n" +) + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def build_sdk_tar_once(tmp_path_factory): + """Build the SDK sdist exactly once across all xdist workers. + + Uses a file lock so only the first worker runs ``python -m build``; all + others wait and then reuse the already-built tar.gz. + + Args: + tmp_path_factory: pytest's ``tmp_path_factory`` fixture. + + Returns: + str: Absolute path to the built ``.tar.gz`` file. + """ + root_tmp = tmp_path_factory.getbasetemp().parent + tar_dir = root_tmp / "sagemaker_sdk_tar" + tar_dir.mkdir(exist_ok=True) + lock_file = root_tmp / "sagemaker_sdk_tar.lock" + + with filelock.FileLock(str(lock_file)): + existing = list(tar_dir.glob("*.tar.gz")) + if not existing: + _generate_sdk_tar(str(tar_dir)) + existing = list(tar_dir.glob("*.tar.gz")) + + return str(existing[0]) + + +def build_container_once(fixture_name, sagemaker_session, py_version, docker_template, + sdk_tar_path, tmp_path_factory, is_auto_capture=False, + extra_files_hook=None): + """Build and push a Docker image exactly once across all xdist workers. + + The first worker to acquire the lock builds the image and writes the ECR + URI to a small text file. All subsequent workers simply read that file. + + Args: + fixture_name (str): A unique key for this image (used for lock/URI files). + sagemaker_session: A SageMaker ``Session`` (or compatible) object. + py_version (str): Python version string, e.g. ``"3.10"``. + docker_template (str): Dockerfile template string with ``{py_version}`` + and ``{source_archive}`` placeholders. + sdk_tar_path (str): Path to the pre-built SDK ``.tar.gz``. + tmp_path_factory: pytest's ``tmp_path_factory`` fixture. + is_auto_capture (bool): If True, build a local-only image (no ECR push) + and return the local image ID instead of an ECR URI. + extra_files_hook (callable | None): Optional ``fn(tmpdir: str)`` called + after the SDK tar is copied into the build context but before the + Docker build runs. Use this to copy test-specific files (e.g. + ``test_auto_capture.py``) into the build context. + + Returns: + str: ECR image URI (or local Docker image ID when ``is_auto_capture=True``). + """ + root_tmp = tmp_path_factory.getbasetemp().parent + uri_file = root_tmp / f"{fixture_name}.ecr_uri" + lock_file = root_tmp / f"{fixture_name}.lock" + + with filelock.FileLock(str(lock_file)): + if uri_file.exists(): + return uri_file.read_text().strip() + if is_auto_capture: + ecr_uri = _build_auto_capture(py_version, docker_template, sdk_tar_path, + extra_files_hook) + else: + ecr_uri = _build_and_push(sagemaker_session, py_version, docker_template, sdk_tar_path) + uri_file.write_text(ecr_uri) + + return ecr_uri + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +@contextmanager +def _tmpdir(): + tmpdir = tempfile.mkdtemp() + try: + yield tmpdir + finally: + shutil.rmtree(tmpdir) + + +def _build_auto_capture(py_version, docker_template, sdk_tar_path, extra_files_hook=None): + """Build a local-only Docker image for auto-capture tests and return its ID.""" + with _tmpdir() as tmpdir: + source_archive = os.path.basename(sdk_tar_path) + shutil.copy2(sdk_tar_path, os.path.join(tmpdir, source_archive)) + if extra_files_hook: + extra_files_hook(tmpdir) + with open(os.path.join(tmpdir, "Dockerfile"), "w") as f: + f.write(docker_template.format(py_version=py_version, source_archive=source_archive)) + docker_client = docker.from_env() + image, _ = docker_client.images.build(path=tmpdir, tag=REPO_NAME, rm=True) + return image.id + + +def _build_and_push(sagemaker_session, py_version, docker_template, sdk_tar_path): + """Build a Docker image locally and push it to ECR.""" + region = sagemaker_session.boto_region_name + ecr_client = sagemaker_session.boto_session.client("ecr") + username, password = _ecr_login(ecr_client) + + # Import lazily to support both sagemaker-core and sagemaker-mlops layouts. + try: + from sagemaker.core.common_utils import sagemaker_timestamp + except ImportError: + from sagemaker.utils import sagemaker_timestamp + + image_tag = f"{py_version.replace('.', '-')}-{sagemaker_timestamp()}" + + with _tmpdir() as tmpdir: + source_archive = os.path.basename(sdk_tar_path) + shutil.copy2(sdk_tar_path, os.path.join(tmpdir, source_archive)) + with open(os.path.join(tmpdir, "Dockerfile"), "w") as f: + f.write(docker_template.format(py_version=py_version, source_archive=source_archive)) + + docker_client = docker.from_env() + try: + image, _ = docker_client.images.build( + path=tmpdir, tag=REPO_NAME, rm=True, platform="linux/amd64", pull=True + ) + except BuildError as e: + for line in e.build_log: + if "stream" in line: + print(line["stream"].strip()) + raise + + # Resolve the ECR account: use the caller's account if the repo exists + # there, otherwise fall back to the shared REPO_ACCOUNT_ID. + if _is_repository_exists(ecr_client, REPO_NAME): + try: + from sagemaker.core.common_utils import sts_regional_endpoint + except ImportError: + from sagemaker.utils import sts_regional_endpoint + + sts_client = sagemaker_session.boto_session.client( + "sts", region_name=region, endpoint_url=sts_regional_endpoint(region) + ) + account_id = sts_client.get_caller_identity()["Account"] + else: + account_id = REPO_ACCOUNT_ID + + ecr_image = _ecr_image_uri(account_id, region, REPO_NAME, image_tag) + image.tag(ecr_image, tag=image_tag) + docker_client.images.push(ecr_image, auth_config={"username": username, "password": password}) + return ecr_image + + +def _is_repository_exists(ecr_client, repo_name): + try: + ecr_client.describe_repositories(repositoryNames=[repo_name]) + return True + except ecr_client.exceptions.RepositoryNotFoundException: + return False + + +def _ecr_login(ecr_client): + login = ecr_client.get_authorization_token() + b64token = login["authorizationData"][0]["authorizationToken"].encode("utf-8") + username, password = base64.b64decode(b64token).decode("utf-8").split(":") + return username, password + + +def _ecr_image_uri(account, region, image_name, tag): + return "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(account, region, image_name, tag) + + +def _generate_sdk_tar(destination_folder): + """Run ``python -m build --sdist`` and return the archive filename.""" + command = f"python -m build --sdist -o {destination_folder}" + try: + result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True) + print(result.stdout) + print(result.stderr) + except subprocess.CalledProcessError as e: + print(f"SDK sdist build failed:\n{e.stderr}") + raise + archives = [f for f in os.listdir(destination_folder) if f.endswith(".tar.gz")] + return archives[0]