From ac5f7b78ba60d5b56acba78fe53e6fa9ea918688 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Tue, 28 Apr 2026 14:38:13 -0700 Subject: [PATCH 01/15] feat(feature-processor): Add use_lake_formation_credentials parameter Add configurable use_lake_formation_credentials parameter to the @feature_processor decorator, defaulting to False. The value flows through FeatureProcessorConfig to the Spark connector's ingest_data() call, enabling Lake Formation credential vending when set to True. --- X-AI-Prompt: make useLakeFormationCreds configurable, defaults to False, passed to feature_processor X-AI-Tool: kiro-cli --- .../feature_processor/_feature_processor_config.py | 3 +++ .../feature_store/feature_processor/_udf_output_receiver.py | 1 + .../feature_store/feature_processor/feature_processor.py | 4 ++++ .../feature_processor/test_udf_output_receiver.py | 1 + 4 files changed, 9 insertions(+) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py index f5d4dd91f3..1fab16a640 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py @@ -47,6 +47,7 @@ class FeatureProcessorConfig: parameters: Optional[Dict[str, Union[str, Dict]]] = attr.ib() enable_ingestion: bool = attr.ib() spark_config: Dict[str, str] = attr.ib() + use_lake_formation_credentials: bool = attr.ib() @staticmethod def create( @@ -59,6 +60,7 @@ def create( parameters: Optional[Dict[str, Union[str, Dict]]], enable_ingestion: bool, spark_config: Dict[str, str], + use_lake_formation_credentials: bool = False, ) -> "FeatureProcessorConfig": """Static initializer.""" return FeatureProcessorConfig( @@ -69,4 +71,5 @@ def create( parameters=parameters, enable_ingestion=enable_ingestion, spark_config=spark_config, + use_lake_formation_credentials=use_lake_formation_credentials, ) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py index a037e837c2..08fc280f46 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py @@ -81,6 +81,7 @@ def ingest_udf_output(self, output: DataFrame, fp_config: FeatureProcessorConfig input_data_frame=output, feature_group_arn=fp_config.output, target_stores=fp_config.target_stores, + use_lake_formation_credentials=fp_config.use_lake_formation_credentials ) except Py4JJavaError as e: if e.java_exception.getClass().getSimpleName() == "StreamIngestionFailureException": diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py index 31593a3f1c..214c49109a 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py @@ -40,6 +40,7 @@ def feature_processor( parameters: Optional[Dict[str, Union[str, Dict]]] = None, enable_ingestion: bool = True, spark_config: Dict[str, str] = None, + use_lake_formation_credentials: bool = False, ) -> Callable: """Decorator to facilitate feature engineering for Feature Groups. @@ -96,6 +97,8 @@ def transform(input_feature_group, input_csv): development phase to ensure that data is not used until the function is ready. It also useful for users that want to manage their own data ingestion. Defaults to True. spark_config (Dict[str, str]): A dict contains the key-value paris for Spark configurations. + use_lake_formation_credentials (bool, optional): Whether to use Lake Formation credential + vending for data ingestion. Defaults to False. Raises: IngestionError: If any rows are not ingested successfully then a sample of the records, @@ -114,6 +117,7 @@ def decorator(udf: Callable[..., Any]) -> Callable: parameters=parameters, enable_ingestion=enable_ingestion, spark_config=spark_config, + use_lake_formation_credentials=use_lake_formation_credentials, ) validator_chain = ValidatorFactory.get_validation_chain(fp_config) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py index 34770a011a..535bec725a 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py @@ -64,6 +64,7 @@ def test_ingest_udf_output(df, feature_store_manager, spark_output_receiver): input_data_frame=df, feature_group_arn=fp_config.output, target_stores=fp_config.target_stores, + use_lake_formation_credentials=fp_config.use_lake_formation_credentials, ) From 18cd319ef54dab57e43cc8e9044ad6cb5168bb34 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Tue, 28 Apr 2026 14:41:18 -0700 Subject: [PATCH 02/15] feat(feature-processor): Add signing key for stored function integrity Generate ECDSA signing key in ConfigUploader and pass it to StoredFunction for function payload signature verification. The public key PEM is returned to callers for remote-side verification. --- X-AI-Prompt: fix StoredFunction missing signing_key error in feature_processor pipeline X-AI-Tool: kiro-cli --- .../feature_processor/_config_uploader.py | 29 +++++++++++++++---- .../feature_processor/test_config_uploader.py | 1 + 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py index d181218fb5..c1db9d19d7 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py @@ -15,6 +15,8 @@ from typing import Callable, Dict, Optional, Tuple, List, Union import attr +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization as crypto_serialization from sagemaker.core.helper.session_helper import Session from sagemaker.mlops.feature_store.feature_processor._constants import ( @@ -52,13 +54,13 @@ class ConfigUploader: def prepare_step_input_channel_for_spark_mode( self, func: Callable, s3_base_uri: str, sagemaker_session: Session - ) -> Tuple[List[Channel], Dict]: + ) -> Tuple[List[Channel], Dict, str]: """Prepares input channels for SageMaker Pipeline Step. Returns: - Tuple of (List[Channel], spark_dependency_paths dict) + Tuple of (List[Channel], spark_dependency_paths dict, public_key_pem str) """ - self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session) + public_key_pem = self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session) bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts( self.remote_decorator_config.spark_config, s3_base_uri, @@ -139,18 +141,33 @@ def prepare_step_input_channel_for_spark_mode( SPARK_JAR_FILES_PATH: submit_jars_s3_paths, SPARK_PY_FILES_PATH: submit_py_files_s3_paths, SPARK_FILES_PATH: submit_files_s3_path, - } + }, public_key_pem def _prepare_and_upload_callable( self, func: Callable, s3_base_uri: str, sagemaker_session: Session - ) -> None: - """Prepares and uploads callable to S3""" + ) -> str: + """Prepares and uploads callable to S3. + + Returns: + str: The public key PEM string for signature verification on the remote side. + """ + private_key = ec.generate_private_key(ec.SECP256R1()) + public_key_pem = ( + private_key.public_key() + .public_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + .decode("utf-8") + ) stored_function = StoredFunction( sagemaker_session=sagemaker_session, s3_base_uri=s3_base_uri, + signing_key=private_key, s3_kms_key=self.remote_decorator_config.s3_kms_key, ) stored_function.save(func) + return public_key_pem def _prepare_and_upload_workspace( self, diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py index 25ded72266..b181378683 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py @@ -238,6 +238,7 @@ def test_prepare_step_input_channel( ( input_data_config, spark_dependency_paths, + _public_key_pem, ) = config_uploader.prepare_step_input_channel_for_spark_mode( wrapped_func, config_uploader.remote_decorator_config.s3_root_uri, From 1aa4b3dcc24560679607065d3eb97bc23a006615 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Tue, 28 Apr 2026 14:55:33 -0700 Subject: [PATCH 03/15] feat(feature-processor): Add Spark image resolver for dynamic image URI Add _image_resolver module that resolves the SageMaker Spark processing container image URI based on installed PySpark and Python versions. Supports Spark 3.1/3.2/3.3/3.5 with appropriate Python version mapping. Uses container_version=v1 as a floating tag. --- X-AI-Prompt: add image resolver with container_version v1 for spark processing image X-AI-Tool: kiro-cli --- .../feature_processor/_image_resolver.py | 65 +++++++++++ .../feature_processor/test_image_resolver.py | 104 ++++++++++++++++++ 2 files changed, 169 insertions(+) create mode 100644 sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py create mode 100644 sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py new file mode 100644 index 0000000000..c19bc0fdda --- /dev/null +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py @@ -0,0 +1,65 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Resolves SageMaker Spark container image URIs based on installed PySpark and Python versions.""" +from __future__ import absolute_import + +import sys + +import pyspark + +from sagemaker.core import image_uris + +SPARK_IMAGE_SUPPORT_MATRIX = { + "3.1": ["py37"], + "3.2": ["py39"], + "3.3": ["py39"], + "3.5": ["py39", "py312"], +} + + +def _get_spark_image_uri(session): + """Resolve the SageMaker Spark container image URI for the installed PySpark and Python versions. + + Args: + session: SageMaker Session with boto_region_name attribute. + + Returns: + str: The ECR image URI for the matching Spark container. + + Raises: + ValueError: If the Spark/Python version combination is not supported. + """ + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + py_version = f"py{sys.version_info[0]}{sys.version_info[1]}" + + supported_py = SPARK_IMAGE_SUPPORT_MATRIX.get(spark_version) + if supported_py is None: + supported = ", ".join(sorted(SPARK_IMAGE_SUPPORT_MATRIX.keys())) + raise ValueError( + f"No SageMaker Spark container image available for Spark {spark_version}. " + f"Supported versions for remote execution: {supported}." + ) + + if py_version not in supported_py: + raise ValueError( + f"SageMaker Spark {spark_version} container images support " + f"{', '.join(supported_py)}. Current Python version: {py_version}." + ) + + return image_uris.retrieve( + framework="spark", + region=session.boto_region_name, + version=spark_version, + py_version=py_version, + container_version="v1", + ) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py new file mode 100644 index 0000000000..965dc2c8a4 --- /dev/null +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import sys + +import pyspark +import pytest +from mock import Mock, patch + +from sagemaker.mlops.feature_store.feature_processor._image_resolver import _get_spark_image_uri + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_33_py39(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.3-cpu-py39-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.3.2"), \ + patch.object(sys, "version_info", (3, 9, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.3", + py_version="py39", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_35_py39(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py39-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 9, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.5", + py_version="py39", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_35_py312(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py312-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 12, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.5", + py_version="py312", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +def test_spark_34_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.4.1"), \ + patch.object(sys, "version_info", (3, 9, 0)): + with pytest.raises(ValueError, match="No SageMaker Spark container image available for Spark 3.4"): + _get_spark_image_uri(session) + + +def test_spark_35_py310_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 10, 0)): + with pytest.raises(ValueError, match="SageMaker Spark 3.5 container images support"): + _get_spark_image_uri(session) + + +def test_spark_33_py312_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.3.2"), \ + patch.object(sys, "version_info", (3, 12, 0)): + with pytest.raises(ValueError, match="SageMaker Spark 3.3 container images support"): + _get_spark_image_uri(session) + + +def test_unknown_spark_version_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.6.0"), \ + patch.object(sys, "version_info", (3, 9, 0)): + with pytest.raises(ValueError, match="No SageMaker Spark container image available for Spark 3.6"): + _get_spark_image_uri(session) From 87960031a359bf8ddbdec9f2cd45d296421b38e8 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Tue, 28 Apr 2026 14:59:08 -0700 Subject: [PATCH 04/15] feat(feature-processor): Use image resolver and pass signing key in scheduler Update feature_scheduler to use _get_spark_image_uri for dynamic image resolution instead of _JobSettings._get_default_spark_image. Thread public_key_pem from ConfigUploader through to ModelTrainer environment as REMOTE_FUNCTION_SECRET_KEY. Allow user-provided image_uri to take precedence over auto-resolved URI. --- X-AI-Prompt: integrate image resolver and signing key into feature scheduler pipeline X-AI-Tool: kiro-cli --- .../feature_processor/feature_scheduler.py | 9 ++++++- .../test_feature_scheduler.py | 24 +++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py index c18232cb08..e0c57a9afb 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py @@ -98,6 +98,7 @@ FeatureProcessorLineageHandler, TransformationCode, ) +from sagemaker.mlops.feature_store.feature_processor._image_resolver import _get_spark_image_uri from sagemaker.core.remote_function.job import ( _JobSettings, @@ -174,6 +175,7 @@ def to_pipeline( ( input_data_config, spark_dependency_paths, + public_key_pem, ) = config_uploader.prepare_step_input_channel_for_spark_mode( func=getattr(step, "wrapped_func", step), s3_base_uri=s3_base_uri, @@ -194,6 +196,7 @@ def to_pipeline( spark_dependency_paths=spark_dependency_paths, pipeline_session=pipeline_session, role=_role, + public_key_pem=public_key_pem, ) step_args = model_trainer.train(input_data_config=input_data_config) @@ -846,6 +849,7 @@ def _prepare_model_trainer_from_remote_decorator_config( spark_dependency_paths: Dict[str, Optional[str]], pipeline_session: PipelineSession, role: str, + public_key_pem: str = None, ) -> ModelTrainer: """Prepares a ModelTrainer instance from remote decorator configuration. @@ -866,6 +870,8 @@ def _prepare_model_trainer_from_remote_decorator_config( # Build environment dict from remote_decorator_config environment = dict(remote_decorator_config.environment_variables or {}) + if public_key_pem: + environment["REMOTE_FUNCTION_SECRET_KEY"] = public_key_pem # Build command from container entry point and arguments entry_point_and_args = _get_container_entry_point_and_arguments( @@ -1039,7 +1045,8 @@ def _get_remote_decorator_config_from_input( # TODO: This needs to be removed when new mode is introduced. if remote_decorator_config.spark_config is None: remote_decorator_config.spark_config = SparkConfig() - remote_decorator_config.image_uri = _JobSettings._get_default_spark_image(sagemaker_session) + if not remote_decorator_config.image_uri: + remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session) return remote_decorator_config diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py index 9cd5655de5..9173ea0421 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py @@ -162,6 +162,10 @@ def config_uploader(): return uploader +@patch( + "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) @patch( "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._validate_fg_lineage_resources", return_value=None, @@ -171,7 +175,7 @@ def config_uploader(): return_value=mock_pipeline(), ) @patch( - "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.mlops.feature_store.feature_processor._config_uploader.TrainingInput") @@ -470,9 +474,13 @@ def test_to_pipeline_not_wrapped_by_remote(get_execution_role, session): ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) @@ -522,9 +530,13 @@ def test_to_pipeline_wrong_mode(get_execution_role, mock_spark_image, session): ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) @@ -577,9 +589,13 @@ def test_to_pipeline_pipeline_name_length_limit_exceeds( ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) From 8d8847f1b589d52ba983ef9d2f16f765e830e90f Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Tue, 28 Apr 2026 15:07:40 -0700 Subject: [PATCH 05/15] fix(feature-processor): Dynamic Hadoop version and always-on Feature Store JARs Resolve Hadoop version dynamically based on installed PySpark version instead of hardcoding 3.3.1. Move Feature Store JAR classpath setup outside the non-training-job guard so spark.jars is always set, fixing FeatureStoreManager class loading in training job mode. --- X-AI-Prompt: fix spark factory hadoop version and jar classpath for spark 3.5 X-AI-Tool: kiro-cli --- .../feature_processor/_spark_factory.py | 56 ++++++++++---- .../test_feature_processor_spark_compat.py | 76 +++++++++++++++++++ .../test_spark_session_factory.py | 42 +++++++++- 3 files changed, 157 insertions(+), 17 deletions(-) create mode 100644 sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py index d304185e85..43a507a993 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py @@ -13,10 +13,12 @@ """Contains factory classes for instantiating Spark objects.""" from __future__ import absolute_import +import logging from functools import lru_cache from typing import List, Tuple, Dict import feature_store_pyspark +import pyspark import feature_store_pyspark.FeatureStoreManager as fsm from pyspark.conf import SparkConf from pyspark.context import SparkContext @@ -26,6 +28,32 @@ SPARK_APP_NAME = "FeatureProcessor" +logger = logging.getLogger(__name__) + +SPARK_TO_HADOOP_MAP = { + "3.1": "3.2.0", + "3.2": "3.3.1", + "3.3": "3.3.2", + "3.4": "3.3.4", + "3.5": "3.3.4", +} + +_DEFAULT_HADOOP_VERSION = "3.3.4" + +def _get_hadoop_version(): + """Resolve the Hadoop version for the installed PySpark version.""" + spark_version = pyspark.__version__ + major_minor = ".".join(spark_version.split(".")[:2]) + hadoop_version = SPARK_TO_HADOOP_MAP.get(major_minor) + if hadoop_version is None: + hadoop_version = _DEFAULT_HADOOP_VERSION + logger.warning( + "Unknown Spark version %s. Falling back to Hadoop %s.", + spark_version, + hadoop_version, + ) + return hadoop_version + class SparkSessionFactory: """Lazy loading, memoizing, instantiation of SparkSessions. @@ -115,28 +143,28 @@ def _get_spark_configs(self, is_training_job) -> List[Tuple[str, str]]: spark_configs.extend(self.spark_config.items()) if not is_training_job: - fp_spark_jars = feature_store_pyspark.classpath_jars() + hadoop_version = _get_hadoop_version() fp_spark_packages = [ - "org.apache.hadoop:hadoop-aws:3.3.1", - "org.apache.hadoop:hadoop-common:3.3.1", + f"org.apache.hadoop:hadoop-aws:{hadoop_version}", + f"org.apache.hadoop:hadoop-common:{hadoop_version}", ] - if self.spark_config and "spark.jars" in self.spark_config: - fp_spark_jars.append(self.spark_config.get("spark.jars")) - if self.spark_config and "spark.jars.packages" in self.spark_config: fp_spark_packages.append(self.spark_config.get("spark.jars.packages")) - spark_configs.extend( - ( - ("spark.jars", ",".join(fp_spark_jars)), - ( - "spark.jars.packages", - ",".join(fp_spark_packages), - ), - ) + spark_configs.append( + ("spark.jars.packages", ",".join(fp_spark_packages)) ) + # Always add Feature Store JARs so they are on the classpath + # regardless of whether we are in a training job or not. + fp_spark_jars = feature_store_pyspark.classpath_jars() + + if self.spark_config and "spark.jars" in self.spark_config: + fp_spark_jars.append(self.spark_config.get("spark.jars")) + + spark_configs.append(("spark.jars", ",".join(fp_spark_jars))) + return spark_configs def _get_jsc_hadoop_configs(self) -> List[Tuple[str, str]]: diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py new file mode 100644 index 0000000000..12d798a00e --- /dev/null +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py @@ -0,0 +1,76 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Integration tests for Spark multi-version compatibility.""" +from __future__ import absolute_import + +import pyspark +import pytest +from mock import Mock + +from sagemaker.mlops.feature_store.feature_processor._spark_factory import ( + SparkSessionFactory, + SPARK_TO_HADOOP_MAP, + _get_hadoop_version, +) +from sagemaker.mlops.feature_store.feature_processor._image_resolver import ( + SPARK_IMAGE_SUPPORT_MATRIX, + _get_spark_image_uri, +) + + +@pytest.mark.slow_test +def test_hadoop_version_resolves_for_installed_pyspark(): + """Verify that the installed PySpark version resolves to a known Hadoop version.""" + hadoop_version = _get_hadoop_version() + spark_major_minor = ".".join(pyspark.__version__.split(".")[:2]) + + if spark_major_minor in SPARK_TO_HADOOP_MAP: + assert hadoop_version == SPARK_TO_HADOOP_MAP[spark_major_minor] + else: + # Unknown version falls back to latest + assert hadoop_version == "3.3.4" + + +@pytest.mark.slow_test +def test_spark_session_factory_configs_include_dynamic_hadoop(): + """Verify SparkSessionFactory produces configs with the correct Hadoop Maven coordinates.""" + env_helper = Mock() + factory = SparkSessionFactory(env_helper) + configs = dict(factory._get_spark_configs(is_training_job=False)) + + hadoop_version = _get_hadoop_version() + packages = configs.get("spark.jars.packages", "") + assert f"org.apache.hadoop:hadoop-aws:{hadoop_version}" in packages + assert f"org.apache.hadoop:hadoop-common:{hadoop_version}" in packages + + +@pytest.mark.slow_test +def test_image_resolver_returns_uri_for_installed_pyspark(): + """Verify the image resolver returns a valid URI for the installed PySpark + Python version.""" + import sys + + spark_major_minor = ".".join(pyspark.__version__.split(".")[:2]) + py_version = f"py{sys.version_info[0]}{sys.version_info[1]}" + + supported_py = SPARK_IMAGE_SUPPORT_MATRIX.get(spark_major_minor) + if supported_py is None or py_version not in supported_py: + pytest.skip( + f"Spark {spark_major_minor} + {py_version} not in support matrix; " + f"skipping image resolver test" + ) + + session = Mock(boto_region_name="us-west-2") + image_uri = _get_spark_image_uri(session) + + assert "sagemaker-spark-processing" in image_uri + assert spark_major_minor in image_uri diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py index 09fade48ed..1e0ab6b4c5 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py @@ -14,6 +14,7 @@ from __future__ import absolute_import import feature_store_pyspark +import pyspark import pytest from mock import Mock, patch, call @@ -72,10 +73,12 @@ def test_spark_session_factory_configuration(): # Verify configurations when not running on a training job assert ",".join(feature_store_pyspark.classpath_jars()) in spark_configs.get("spark.jars") + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + hadoop_version = _get_hadoop_version() assert ",".join( [ - "org.apache.hadoop:hadoop-aws:3.3.1", - "org.apache.hadoop:hadoop-common:3.3.1", + f"org.apache.hadoop:hadoop-aws:{hadoop_version}", + f"org.apache.hadoop:hadoop-common:{hadoop_version}", ] ) in spark_configs.get("spark.jars.packages") @@ -88,9 +91,11 @@ def test_spark_session_factory_configuration_on_training_job(): spark_config = spark_session_factory._get_spark_configs(is_training_job=True) assert dict(spark_config).get("spark.test.key") == "spark.test.value" - assert all(tup[0] != "spark.jars" for tup in spark_config) assert all(tup[0] != "spark.jars.packages" for tup in spark_config) + # spark.jars should always be present (Feature Store JARs are always on the classpath) + assert ",".join(feature_store_pyspark.classpath_jars()) in dict(spark_config).get("spark.jars") + @patch("pyspark.context.SparkContext.getOrCreate") def test_spark_session_factory(mock_spark_context): @@ -173,3 +178,34 @@ def test_spark_session_factory_get_spark_session_with_iceberg_config(env_helper) == "smfs.shaded.org.apache.iceberg.aws.s3.S3FileIO" ) assert iceberg_configs.get("spark.sql.catalog.catalog.glue.skip-name-validation") == "true" + + +@pytest.mark.parametrize( + "spark_version,expected_hadoop", + [ + ("3.1.3", "3.2.0"), + ("3.2.2", "3.3.1"), + ("3.3.2", "3.3.2"), + ("3.4.1", "3.3.4"), + ("3.5.1", "3.3.4"), + ], +) +def test_get_hadoop_version(spark_version, expected_hadoop): + with patch.object(pyspark, "__version__", spark_version): + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + assert _get_hadoop_version() == expected_hadoop + + +def test_get_hadoop_version_unknown_falls_back(): + with patch.object(pyspark, "__version__", "3.6.0"): + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + assert _get_hadoop_version() == "3.3.4" + + +def test_spark_configs_use_dynamic_hadoop_version(): + with patch.object(pyspark, "__version__", "3.5.1"): + env_helper = Mock() + factory = SparkSessionFactory(env_helper) + configs = dict(factory._get_spark_configs(is_training_job=False)) + assert "org.apache.hadoop:hadoop-aws:3.3.4" in configs.get("spark.jars.packages") + assert "org.apache.hadoop:hadoop-common:3.3.4" in configs.get("spark.jars.packages") From 544fd9464c9cbb2e26cc5845ef6eec0ccc06e256 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Wed, 29 Apr 2026 17:52:45 -0700 Subject: [PATCH 06/15] feat(remote-function): Support Python 3.12 and auto-detect Spark version Update _get_default_spark_image to accept Python 3.12 in addition to 3.9. Auto-detect Spark version from installed pyspark instead of hardcoding 3.3, falling back to the default if pyspark is not installed. Also resolve correct Python binary in Spark bootstrap script to avoid PATH conflicts with system python3. --- X-AI-Prompt: fix job.py to select correct spark image for py312 and detect pyspark version X-AI-Tool: kiro-cli --- .../src/sagemaker/core/remote_function/job.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/remote_function/job.py b/sagemaker-core/src/sagemaker/core/remote_function/job.py index b2161f428d..859b99ba42 100644 --- a/sagemaker-core/src/sagemaker/core/remote_function/job.py +++ b/sagemaker-core/src/sagemaker/core/remote_function/job.py @@ -822,15 +822,23 @@ def _get_default_spark_image(session): py_version = str(sys.version_info[0]) + str(sys.version_info[1]) - if py_version not in ["39"]: + if py_version not in ["39", "312"]: raise ValueError( - "The SageMaker Spark image for remote job only supports Python version 3.9. " + "The SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12." ) + # Detect Spark version from installed pyspark, fall back to default + spark_version = DEFAULT_SPARK_VERSION + try: + import pyspark + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + except ImportError: + pass + image_uri = image_uris.retrieve( framework=SPARK_NAME, region=region, - version=DEFAULT_SPARK_VERSION, + version=spark_version, instance_type=None, py_version=f"py{py_version}", container_version=DEFAULT_SPARK_CONTAINER_VERSION, From 932c9975c200951f3b8bbe24b8619186392e1908 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Thu, 30 Apr 2026 15:25:43 -0700 Subject: [PATCH 07/15] Add Integ test (WIP) --- .../test_feature_processor_integ.py | 180 ++++++++++++++++-- 1 file changed, 161 insertions(+), 19 deletions(-) diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py index 6994971f22..c3811002cb 100644 --- a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py @@ -40,7 +40,9 @@ from sagemaker.mlops.feature_store import ( FeatureGroup, FeatureDefinition, + FeatureGroupManager, FeatureTypeEnum, + LakeFormationConfig, OnlineStoreConfig, OfflineStoreConfig, S3StorageConfig, @@ -527,10 +529,6 @@ def transform(raw_s3_data_as_df): @pytest.mark.slow_test -@pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", -) def test_feature_processor_transform_offline_only_store_ingestion_run_with_remote( sagemaker_session, pre_execution_commands, @@ -667,10 +665,6 @@ def transform(raw_s3_data_as_df): @pytest.mark.slow_test -@pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", -) def test_to_pipeline_and_execute( sagemaker_session, pre_execution_commands, @@ -789,11 +783,151 @@ def transform(raw_s3_data_as_df): # cleanup_pipeline(pipeline_name="pipeline-name-01", sagemaker_session=sagemaker_session) +def test_to_pipeline_and_execute_with_lake_formation( + sagemaker_session, + pre_execution_commands, + dependencies_path, +): + pipeline_name = "pipeline-name-lf-01" + car_data_feature_group_name = get_car_data_feature_group_name() + car_data_fg = None + try: + offline_store_s3_uri = get_offline_store_s3_uri(sagemaker_session=sagemaker_session) + role_arn = get_execution_role(sagemaker_session) + + lake_formation_config = LakeFormationConfig( + enabled=True, + hybrid_access_mode_enabled=False, + use_service_linked_role=False, + registration_role_arn=role_arn, + acknowledge_risk=True, + ) + car_data_fg = FeatureGroupManager.create( + feature_group_name=car_data_feature_group_name, + record_identifier_feature_name="id", + event_time_feature_name="ingest_time", + feature_definitions=CAR_SALES_FG_FEATURE_DEFINITIONS, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"{offline_store_s3_uri}/car-data") + ), + online_store_config=OnlineStoreConfig(enable_online_store=True), + role_arn=role_arn, + lake_formation_config=lake_formation_config, + ) + car_data_arn = car_data_fg.feature_group_arn + + raw_data_uri = get_raw_car_data_s3_uri(sagemaker_session=sagemaker_session) + + @remote( + pre_execution_commands=pre_execution_commands, + dependencies=dependencies_path, + spark_config=SparkConfig(), + instance_type="ml.m5.xlarge", + ) + @feature_processor( + inputs=[CSVDataSource(raw_data_uri)], + output=car_data_arn, + target_stores=["OfflineStore"], + use_lake_formation_credentials=True, + ) + def transform(raw_s3_data_as_df): + """Load data from S3, perform basic feature engineering, store it in a Feature Group""" + from pyspark.sql.functions import regexp_replace + from pyspark.sql.functions import lit + + transformed_df = ( + raw_s3_data_as_df + # Rename Columns + .withColumnRenamed("Id", "id") + .withColumnRenamed("Model", "model") + .withColumnRenamed("Year", "model_year") + .withColumnRenamed("Status", "status") + .withColumnRenamed("Mileage", "mileage") + .withColumnRenamed("Price", "price") + .withColumnRenamed("MSRP", "msrp") + # Add Event Time + .withColumn("ingest_time", lit(int(time.time()))) + # Remove punctuation and fluff; replace with NA + .withColumn("Price", regexp_replace("Price", "\$", "")) # noqa: W605 + .withColumn("mileage", regexp_replace("mileage", "(,)|(mi\.)", "")) # noqa: W605 + .withColumn("mileage", regexp_replace("mileage", "Not available", "NA")) + .withColumn("price", regexp_replace("price", ",", "")) + .withColumn("msrp", regexp_replace("msrp", "(^MSRP\s\\$)|(,)", "")) # noqa: W605 + .withColumn("msrp", regexp_replace("msrp", "Not specified", "NA")) + .withColumn("msrp", regexp_replace("msrp", "\\$\d+[a-zA-Z\s]+", "NA")) # noqa: W605 + .withColumn("model", regexp_replace("model", "^\d\d\d\d\s", "")) # noqa: W605 + ) + + transformed_df.show() + return transformed_df + + _wait_for_feature_group_lineage_contexts( + car_data_feature_group_name, sagemaker_session + ) + + pipeline_arn = to_pipeline( + pipeline_name=pipeline_name, + step=transform, + role_arn=role_arn, + max_retries=2, + tags=[("integ_test_tag_key_1", "integ_test_tag_key_2")], + sagemaker_session=sagemaker_session, + ) + _sagemaker_client = get_sagemaker_client(sagemaker_session=sagemaker_session) + + assert pipeline_arn is not None + + tags = _sagemaker_client.list_tags(ResourceArn=pipeline_arn)["Tags"] + + tag_keys = [tag["Key"] for tag in tags] + assert "integ_test_tag_key_1" in tag_keys + + pipeline_description = Pipeline(name=pipeline_name).describe() + assert pipeline_arn == pipeline_description["PipelineArn"] + assert role_arn == pipeline_description["RoleArn"] + + pipeline_definition = json.loads(pipeline_description["PipelineDefinition"]) + assert len(pipeline_definition["Steps"]) == 1 + for retry_policy in pipeline_definition["Steps"][0]["RetryPolicies"]: + assert retry_policy["MaxAttempts"] == 2 + + pipeline_execution_arn = execute( + pipeline_name=pipeline_name, sagemaker_session=sagemaker_session + ) + + status = _wait_for_pipeline_execution_to_reach_terminal_state( + pipeline_execution_arn=pipeline_execution_arn, + sagemaker_client=_sagemaker_client, + ) + assert status == "Succeeded" + + finally: + if car_data_fg is not None: + cleanup_offline_store( + feature_group=car_data_fg, + sagemaker_session=sagemaker_session, + ) + try: + car_data_fg.refresh() + data_catalog_config = car_data_fg.offline_store_config.data_catalog_config + boto3_session = sagemaker_session.boto_session + glue_client = boto3_session.client( + "glue", region_name=sagemaker_session.boto_region_name + ) + glue_client.delete_table( + DatabaseName=data_catalog_config.database, + Name=data_catalog_config.table_name, + ) + logging.info( + f"Deleted Glue table {data_catalog_config.database}.{data_catalog_config.table_name}" + ) + raise Exception('noop') + except Exception as e: + logging.warning(f"Failed to delete Glue table: {e}") + cleanup_feature_group(car_data_fg, sagemaker_session=sagemaker_session) + + @pytest.mark.slow_test -@pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", -) def test_schedule_and_event_trigger( sagemaker_session, pre_execution_commands, @@ -1076,14 +1210,22 @@ def get_pre_execution_commands(sagemaker_session): """Build SDK wheels, upload to S3, and return pre-execution install commands.""" s3_prefix, wheel_names = get_wheel_file_s3_uri(sagemaker_session=sagemaker_session) sagemaker_whl, core_whl, mlops_whl = wheel_names - print(f'{sagemaker_whl=}, {core_whl=}, {mlops_whl}') - return [ - f"aws s3 cp {s3_prefix}/ /tmp/packages/ --recursive", - "pip3 install 'setuptools<75'", - f"pip3 install --no-build-isolation '/tmp/packages/{sagemaker_whl}[feature-processor]' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", - f"pip3 install --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", - f"pip3 install --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", + print(f'{sagemaker_whl=}, {core_whl=}, {mlops_whl=}') + PIP = "python3.12 -m pip install --root-user-action=ignore" + AWS = "python3.12 -m awscli" + cmds = [ + f"{PIP} awscli", + f"{AWS} s3 cp {s3_prefix}/ /tmp/packages/ --recursive", + f"{PIP} 'setuptools<75'", + f"{PIP} --no-build-isolation '/tmp/packages/{mlops_whl}[feature-processor]' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", + f"{PIP} --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", + f"{PIP} --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", + f"{PIP} 'sagemaker-feature-store-pyspark==2.0.0'", + # Copy only the Spark-version-matched Feature Store JAR to avoid classpath conflicts + "cp /usr/local/lib/python3.12/site-packages/feature_store_pyspark/jars/sagemaker-feature-store-spark-sdk-3.5.jar /usr/lib/spark/jars/", ] + print(cmds) + return cmds def create_feature_groups( From 5eca24b11a0465721928f5ed9341422b6170ab40 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Thu, 30 Apr 2026 20:38:42 -0700 Subject: [PATCH 08/15] fix(sagemaker-core): Update Spark image error message to include Python 3.12 Update expected error message in remote function tests to reflect that SageMaker Spark images now support Python versions 3.9 and 3.12. --- sagemaker-core/tests/unit/remote_function/test_job.py | 2 +- .../tests/unit/remote_function/test_job_comprehensive.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sagemaker-core/tests/unit/remote_function/test_job.py b/sagemaker-core/tests/unit/remote_function/test_job.py index db9c6ff5d0..67ebb96b51 100644 --- a/sagemaker-core/tests/unit/remote_function/test_job.py +++ b/sagemaker-core/tests/unit/remote_function/test_job.py @@ -133,7 +133,7 @@ def test_get_default_spark_image_unsupported_python_raises_error(self, mock_sess with patch.object(sys, "version_info", (3, 8, 0)): with pytest.raises( ValueError, - match="SageMaker Spark image for remote job only supports Python version 3.9", + match="SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12", ): _JobSettings._get_default_spark_image(mock_session) diff --git a/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py b/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py index 8555a79469..27b78a50bf 100644 --- a/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py +++ b/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py @@ -131,7 +131,7 @@ def test_get_default_spark_image_unsupported_python(self, mock_session): with patch.object(sys, "version_info", (3, 8, 0)): with pytest.raises( ValueError, - match="SageMaker Spark image for remote job only supports Python version 3.9", + match="SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12", ): _JobSettings._get_default_spark_image(mock_session) From 6a9e051f40490f747cb662ffa77cf00eddfe0b58 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Thu, 30 Apr 2026 20:41:08 -0700 Subject: [PATCH 09/15] chore(sagemaker-mlops): Add pyspark 3.5.1 to test and feature-processor deps Pin pyspark==3.5.1 in both feature-processor and test optional dependencies to ensure consistent Spark version across environments. --- sagemaker-mlops/pyproject.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sagemaker-mlops/pyproject.toml b/sagemaker-mlops/pyproject.toml index e41eacf89a..41f2a8bc61 100644 --- a/sagemaker-mlops/pyproject.toml +++ b/sagemaker-mlops/pyproject.toml @@ -35,8 +35,8 @@ dependencies = [ [project.optional-dependencies] feature-processor = [ - "pyspark==3.3.2", - "sagemaker-feature-store-pyspark-3.3", + "pyspark==3.5.1", + "sagemaker-feature-store-pyspark==2.0.0", "setuptools<82", ] @@ -45,8 +45,8 @@ test = [ "pytest-cov", "mock", "setuptools<82", - "pyspark==3.3.2", - "sagemaker-feature-store-pyspark-3.3", + "pyspark==3.5.1", + "sagemaker-feature-store-pyspark==2.0.0", "pandas<3.0", "numpy<3.0", ] From eb1eaf853427f509a5d94232ee87ec220ae50056 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Thu, 30 Apr 2026 21:31:38 -0700 Subject: [PATCH 10/15] test(sagemaker-mlops): Skip Spark integ tests on unsupported Python versions SageMaker Spark image only supports Python 3.9 and 3.12. Add skipif markers to three feature processor integ tests that fail on Python 3.10. --- .../test_feature_processor_integ.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py index c3811002cb..7cb93e7765 100644 --- a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py @@ -664,6 +664,10 @@ def transform(raw_s3_data_as_df): ) +@pytest.mark.skipif( + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", +) @pytest.mark.slow_test def test_to_pipeline_and_execute( sagemaker_session, @@ -783,6 +787,10 @@ def transform(raw_s3_data_as_df): # cleanup_pipeline(pipeline_name="pipeline-name-01", sagemaker_session=sagemaker_session) +@pytest.mark.skipif( + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", +) def test_to_pipeline_and_execute_with_lake_formation( sagemaker_session, pre_execution_commands, @@ -821,8 +829,9 @@ def test_to_pipeline_and_execute_with_lake_formation( @remote( pre_execution_commands=pre_execution_commands, dependencies=dependencies_path, - spark_config=SparkConfig(), + # spark_config=SparkConfig(), instance_type="ml.m5.xlarge", + image_uri='550124139430.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:latest' ) @feature_processor( inputs=[CSVDataSource(raw_data_uri)], @@ -927,6 +936,10 @@ def transform(raw_s3_data_as_df): cleanup_feature_group(car_data_fg, sagemaker_session=sagemaker_session) +@pytest.mark.skipif( + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", +) @pytest.mark.slow_test def test_schedule_and_event_trigger( sagemaker_session, @@ -1220,9 +1233,9 @@ def get_pre_execution_commands(sagemaker_session): f"{PIP} --no-build-isolation '/tmp/packages/{mlops_whl}[feature-processor]' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", f"{PIP} --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", f"{PIP} --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", - f"{PIP} 'sagemaker-feature-store-pyspark==2.0.0'", + # f"{PIP} 'sagemaker-feature-store-pyspark==2.0.0'", # Copy only the Spark-version-matched Feature Store JAR to avoid classpath conflicts - "cp /usr/local/lib/python3.12/site-packages/feature_store_pyspark/jars/sagemaker-feature-store-spark-sdk-3.5.jar /usr/lib/spark/jars/", + # "cp /usr/local/lib/python3.12/site-packages/feature_store_pyspark/jars/sagemaker-feature-store-spark-sdk-3.5.jar /usr/lib/spark/jars/", ] print(cmds) return cmds From 2df34bb6eb8d319c0a25a015a91ef66472f22517 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Fri, 1 May 2026 10:20:41 -0700 Subject: [PATCH 11/15] feat(sagemaker-mlops): Auto-install feature-store-pyspark in to_pipeline Inject sagemaker-feature-store-pyspark>=2,<3 via pre_execution_commands in _get_remote_decorator_config_from_input so it gets installed on the remote container automatically. Update integ tests: add skipif for Python 3.10 Spark tests, remove manual feature-store-pyspark install, use python3 instead of python3.12. --- .../feature_processor/feature_scheduler.py | 8 ++++++++ .../test_feature_processor_integ.py | 13 +++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py index e0c57a9afb..9e13e71a87 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py @@ -1048,6 +1048,14 @@ def _get_remote_decorator_config_from_input( if not remote_decorator_config.image_uri: remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session) + # Ensure sagemaker-feature-store-pyspark is installed on the remote container + install_cmd = "pip install 'sagemaker-feature-store-pyspark>=2,<3'" + if remote_decorator_config.pre_execution_commands: + if install_cmd not in remote_decorator_config.pre_execution_commands: + remote_decorator_config.pre_execution_commands.append(install_cmd) + else: + remote_decorator_config.pre_execution_commands = [install_cmd] + return remote_decorator_config diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py index 7cb93e7765..dc4bfa3470 100644 --- a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py @@ -528,6 +528,10 @@ def transform(raw_s3_data_as_df): ) +@pytest.mark.skipif( + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", +) @pytest.mark.slow_test def test_feature_processor_transform_offline_only_store_ingestion_run_with_remote( sagemaker_session, @@ -1224,18 +1228,15 @@ def get_pre_execution_commands(sagemaker_session): s3_prefix, wheel_names = get_wheel_file_s3_uri(sagemaker_session=sagemaker_session) sagemaker_whl, core_whl, mlops_whl = wheel_names print(f'{sagemaker_whl=}, {core_whl=}, {mlops_whl=}') - PIP = "python3.12 -m pip install --root-user-action=ignore" - AWS = "python3.12 -m awscli" + PIP = "python3 -m pip install --root-user-action=ignore" + AWS = "python3 -m awscli" cmds = [ f"{PIP} awscli", f"{AWS} s3 cp {s3_prefix}/ /tmp/packages/ --recursive", f"{PIP} 'setuptools<75'", - f"{PIP} --no-build-isolation '/tmp/packages/{mlops_whl}[feature-processor]' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", + f"{PIP} --no-build-isolation '/tmp/packages/{mlops_whl}' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", f"{PIP} --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", f"{PIP} --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", - # f"{PIP} 'sagemaker-feature-store-pyspark==2.0.0'", - # Copy only the Spark-version-matched Feature Store JAR to avoid classpath conflicts - # "cp /usr/local/lib/python3.12/site-packages/feature_store_pyspark/jars/sagemaker-feature-store-spark-sdk-3.5.jar /usr/lib/spark/jars/", ] print(cmds) return cmds From 6f08b0619e12821cb4bac33f717b0d3be77eff63 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Fri, 1 May 2026 11:34:30 -0700 Subject: [PATCH 12/15] feat(sagemaker-mlops): Add Python 3.12 Spark support and auto-install feature-store-pyspark - Update test error messages to reflect Python 3.9 and 3.12 support - Add pyspark 3.5.1 to test and feature-processor optional deps - Skip Spark integ tests on unsupported Python versions (3.10) - Auto-install sagemaker-feature-store-pyspark>=2,<3 via pre_execution_commands in to_pipeline and copy version-matched JAR to Spark classpath - Use standard SageMaker Spark image resolution via SparkConfig - Use python3 instead of python3.12 in integ test pre_execution_commands --- .../feature_processor/feature_scheduler.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py index 9e13e71a87..c8688d0fc8 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py @@ -18,6 +18,8 @@ from datetime import datetime from typing import Callable, List, Optional, Dict, Sequence, Union, Any, Tuple +import pyspark + import pytz from botocore.exceptions import ClientError @@ -1049,12 +1051,22 @@ def _get_remote_decorator_config_from_input( remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session) # Ensure sagemaker-feature-store-pyspark is installed on the remote container + # and its Spark-version-matched JAR is on Spark's classpath at startup install_cmd = "pip install 'sagemaker-feature-store-pyspark>=2,<3'" + spark_major_minor = pyspark.__version__.rsplit(".", 1)[0] + copy_jar_cmd = ( + "python3 -c \"" + "import feature_store_pyspark, shutil, glob, os; " + f"jars = glob.glob(os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars', '*{spark_major_minor}*')); " + "[shutil.copy(j, '/usr/lib/spark/jars/') for j in jars]" + "\"" + ) if remote_decorator_config.pre_execution_commands: if install_cmd not in remote_decorator_config.pre_execution_commands: remote_decorator_config.pre_execution_commands.append(install_cmd) + remote_decorator_config.pre_execution_commands.append(copy_jar_cmd) else: - remote_decorator_config.pre_execution_commands = [install_cmd] + remote_decorator_config.pre_execution_commands = [install_cmd, copy_jar_cmd] return remote_decorator_config From 50c87f0b5c8add576313d79041049ddb08b0e3f2 Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Fri, 1 May 2026 14:38:08 -0700 Subject: [PATCH 13/15] feat(feature-processor): Auto-install feature-store-pyspark for Spark remote jobs When spark_config is set on a remote job, _JobSettings now automatically injects pip install of sagemaker-feature-store-pyspark and copies the Spark 3.5-matched JAR to /usr/lib/spark/jars/ via pre_execution_commands. This makes the package work transparently when the SageMaker Spark image does not pre-install sagemaker-feature-store-pyspark. - Make feature_store_pyspark imports lazy in _spark_factory.py to avoid deserialization failures when the module is not yet installed - Add sagemaker-feature-store-pyspark to integ test requirements.txt - Remove duplicate injection from feature_scheduler.py (to_pipeline path) since _JobSettings now handles all Spark remote jobs --- .../src/sagemaker/core/remote_function/job.py | 19 +++++++++ .../feature_processor/_spark_factory.py | 39 +++++++++++++++++-- .../feature_processor/feature_scheduler.py | 20 ---------- .../feature_processor/requirements.txt | 3 +- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/remote_function/job.py b/sagemaker-core/src/sagemaker/core/remote_function/job.py index 859b99ba42..b2d725bb92 100644 --- a/sagemaker-core/src/sagemaker/core/remote_function/job.py +++ b/sagemaker-core/src/sagemaker/core/remote_function/job.py @@ -670,6 +670,25 @@ def __init__( sagemaker_session=self.sagemaker_session, ) + # When using Spark, ensure sagemaker-feature-store-pyspark is installed + # and its version-matched JAR is on Spark's classpath before spark-submit + if spark_config: + install_cmd = ( + "pip install --root-user-action=ignore" + " 'sagemaker-feature-store-pyspark>=2,<3'" + ) + copy_jar_cmd = ( + "python3 -c \"import feature_store_pyspark, shutil, os, glob; " + "jars_dir = os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars'); " + "[shutil.copy(j, '/usr/lib/spark/jars/') " + "for j in glob.glob(os.path.join(jars_dir, '*3.5*.jar'))]\"" + ) + if self.pre_execution_commands is None: + self.pre_execution_commands = [install_cmd, copy_jar_cmd] + elif install_cmd not in self.pre_execution_commands: + self.pre_execution_commands.append(install_cmd) + self.pre_execution_commands.append(copy_jar_cmd) + self.pre_execution_script = resolve_value_from_config( direct_input=pre_execution_script, config_path=REMOTE_FUNCTION_PRE_EXECUTION_SCRIPT, diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py index 43a507a993..7b77c2d076 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py @@ -17,9 +17,7 @@ from functools import lru_cache from typing import List, Tuple, Dict -import feature_store_pyspark import pyspark -import feature_store_pyspark.FeatureStoreManager as fsm from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession @@ -83,6 +81,10 @@ def spark_session(self) -> SparkSession: is_training_job = self.environment_helper.is_training_job() instance_count = self.environment_helper.get_instance_count() + # Copy version-matched Feature Store JAR to Spark's system classpath + # so it's available to the JVM even if SparkContext is already running. + self._install_feature_store_jars() + spark_configs = self._get_spark_configs(is_training_job) spark_conf = SparkConf().setAll(spark_configs).setAppName(SPARK_APP_NAME) @@ -97,6 +99,24 @@ def spark_session(self) -> SparkSession: return SparkSession(sparkContext=sc) + @staticmethod + def _install_feature_store_jars(): + """Copy the Spark-version-matched Feature Store JAR to Spark's system classpath.""" + import feature_store_pyspark + import shutil + import os + + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + target_dir = "/usr/lib/spark/jars" + if not os.path.isdir(target_dir): + return + for jar in feature_store_pyspark.classpath_jars(): + if spark_version in os.path.basename(jar): + dest = os.path.join(target_dir, os.path.basename(jar)) + if not os.path.exists(dest): + shutil.copy(jar, dest) + logger.info("Copied %s to %s", jar, target_dir) + def _get_spark_configs(self, is_training_job) -> List[Tuple[str, str]]: """Generate Spark Configurations optimized for feature_processing functionality. @@ -158,7 +178,16 @@ def _get_spark_configs(self, is_training_job) -> List[Tuple[str, str]]: # Always add Feature Store JARs so they are on the classpath # regardless of whether we are in a training job or not. - fp_spark_jars = feature_store_pyspark.classpath_jars() + import feature_store_pyspark + import os + + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + fp_spark_jars = [ + j for j in feature_store_pyspark.classpath_jars() + if spark_version in os.path.basename(j) + ] + if not fp_spark_jars: + fp_spark_jars = feature_store_pyspark.classpath_jars() if self.spark_config and "spark.jars" in self.spark_config: fp_spark_jars.append(self.spark_config.get("spark.jars")) @@ -225,6 +254,8 @@ class FeatureStoreManagerFactory: @property @lru_cache() - def feature_store_manager(self) -> fsm.FeatureStoreManager: + def feature_store_manager(self) -> "fsm.FeatureStoreManager": """Instansiate a new FeatureStoreManager.""" + import feature_store_pyspark.FeatureStoreManager as fsm + return fsm.FeatureStoreManager() diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py index c8688d0fc8..e0c57a9afb 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py @@ -18,8 +18,6 @@ from datetime import datetime from typing import Callable, List, Optional, Dict, Sequence, Union, Any, Tuple -import pyspark - import pytz from botocore.exceptions import ClientError @@ -1050,24 +1048,6 @@ def _get_remote_decorator_config_from_input( if not remote_decorator_config.image_uri: remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session) - # Ensure sagemaker-feature-store-pyspark is installed on the remote container - # and its Spark-version-matched JAR is on Spark's classpath at startup - install_cmd = "pip install 'sagemaker-feature-store-pyspark>=2,<3'" - spark_major_minor = pyspark.__version__.rsplit(".", 1)[0] - copy_jar_cmd = ( - "python3 -c \"" - "import feature_store_pyspark, shutil, glob, os; " - f"jars = glob.glob(os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars', '*{spark_major_minor}*')); " - "[shutil.copy(j, '/usr/lib/spark/jars/') for j in jars]" - "\"" - ) - if remote_decorator_config.pre_execution_commands: - if install_cmd not in remote_decorator_config.pre_execution_commands: - remote_decorator_config.pre_execution_commands.append(install_cmd) - remote_decorator_config.pre_execution_commands.append(copy_jar_cmd) - else: - remote_decorator_config.pre_execution_commands = [install_cmd, copy_jar_cmd] - return remote_decorator_config diff --git a/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt b/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt index 2940fa2876..7d1e1f6239 100644 --- a/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt +++ b/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt @@ -1 +1,2 @@ -# unrelased sagemaker is installed via pre_execution_commands \ No newline at end of file +# unrelased sagemaker is installed via pre_execution_commands +sagemaker-feature-store-pyspark>=2,<3 \ No newline at end of file From 08a884f6389fdfd00af1a19463c8b7f4f5588bac Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Fri, 1 May 2026 14:50:02 -0700 Subject: [PATCH 14/15] test(feature-processor): Update test_to_pipeline to match injected pre_execution_commands _JobSettings now auto-injects feature-store-pyspark install and JAR copy commands when spark_config is set, so update the test assertion to expect these commands in the _prepare_and_upload_workspace call. --- .../feature_processor/test_feature_scheduler.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py index 9173ea0421..3a085bbd68 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py @@ -302,7 +302,15 @@ def test_to_pipeline( mock_dependency_upload.assert_called_once_with( "/tmp/snapshot", True, - None, + [ + "pip install --root-user-action=ignore 'sagemaker-feature-store-pyspark>=2,<3'", + ( + "python3 -c \"import feature_store_pyspark, shutil, os, glob; " + "jars_dir = os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars'); " + "[shutil.copy(j, '/usr/lib/spark/jars/') " + "for j in glob.glob(os.path.join(jars_dir, '*3.5*.jar'))]\"" + ), + ], None, f"{S3_URI}/pipeline_name", None, From 609c0e6125ceba73b6b271219c0109010820525b Mon Sep 17 00:00:00 2001 From: BassemHalim Date: Fri, 1 May 2026 17:00:09 -0700 Subject: [PATCH 15/15] fix --- sagemaker-core/src/sagemaker/core/remote_function/job.py | 5 ++--- .../feature_processor/test_feature_processor_integ.py | 5 ++--- .../feature_processor/test_feature_scheduler.py | 5 ++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/sagemaker-core/src/sagemaker/core/remote_function/job.py b/sagemaker-core/src/sagemaker/core/remote_function/job.py index b2d725bb92..bfccf54946 100644 --- a/sagemaker-core/src/sagemaker/core/remote_function/job.py +++ b/sagemaker-core/src/sagemaker/core/remote_function/job.py @@ -678,10 +678,9 @@ def __init__( " 'sagemaker-feature-store-pyspark>=2,<3'" ) copy_jar_cmd = ( - "python3 -c \"import feature_store_pyspark, shutil, os, glob; " - "jars_dir = os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars'); " + "python3 -c \"import feature_store_pyspark, shutil; " "[shutil.copy(j, '/usr/lib/spark/jars/') " - "for j in glob.glob(os.path.join(jars_dir, '*3.5*.jar'))]\"" + "for j in feature_store_pyspark.classpath_jars()]\"" ) if self.pre_execution_commands is None: self.pre_execution_commands = [install_cmd, copy_jar_cmd] diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py index dc4bfa3470..f3b0864f1a 100644 --- a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py @@ -795,6 +795,7 @@ def transform(raw_s3_data_as_df): sys.version_info[:2] not in [(3, 9), (3, 12)], reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", ) +@pytest.mark.slow_test def test_to_pipeline_and_execute_with_lake_formation( sagemaker_session, pre_execution_commands, @@ -833,9 +834,8 @@ def test_to_pipeline_and_execute_with_lake_formation( @remote( pre_execution_commands=pre_execution_commands, dependencies=dependencies_path, - # spark_config=SparkConfig(), + spark_config=SparkConfig(), instance_type="ml.m5.xlarge", - image_uri='550124139430.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:latest' ) @feature_processor( inputs=[CSVDataSource(raw_data_uri)], @@ -934,7 +934,6 @@ def transform(raw_s3_data_as_df): logging.info( f"Deleted Glue table {data_catalog_config.database}.{data_catalog_config.table_name}" ) - raise Exception('noop') except Exception as e: logging.warning(f"Failed to delete Glue table: {e}") cleanup_feature_group(car_data_fg, sagemaker_session=sagemaker_session) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py index 3a085bbd68..8d62df3716 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py @@ -305,10 +305,9 @@ def test_to_pipeline( [ "pip install --root-user-action=ignore 'sagemaker-feature-store-pyspark>=2,<3'", ( - "python3 -c \"import feature_store_pyspark, shutil, os, glob; " - "jars_dir = os.path.join(os.path.dirname(feature_store_pyspark.__file__), 'jars'); " + "python3 -c \"import feature_store_pyspark, shutil; " "[shutil.copy(j, '/usr/lib/spark/jars/') " - "for j in glob.glob(os.path.join(jars_dir, '*3.5*.jar'))]\"" + "for j in feature_store_pyspark.classpath_jars()]\"" ), ], None,