diff --git a/sagemaker-core/src/sagemaker/core/remote_function/job.py b/sagemaker-core/src/sagemaker/core/remote_function/job.py index b2161f428d..bfccf54946 100644 --- a/sagemaker-core/src/sagemaker/core/remote_function/job.py +++ b/sagemaker-core/src/sagemaker/core/remote_function/job.py @@ -670,6 +670,24 @@ def __init__( sagemaker_session=self.sagemaker_session, ) + # When using Spark, ensure sagemaker-feature-store-pyspark is installed + # and its version-matched JAR is on Spark's classpath before spark-submit + if spark_config: + install_cmd = ( + "pip install --root-user-action=ignore" + " 'sagemaker-feature-store-pyspark>=2,<3'" + ) + copy_jar_cmd = ( + "python3 -c \"import feature_store_pyspark, shutil; " + "[shutil.copy(j, '/usr/lib/spark/jars/') " + "for j in feature_store_pyspark.classpath_jars()]\"" + ) + if self.pre_execution_commands is None: + self.pre_execution_commands = [install_cmd, copy_jar_cmd] + elif install_cmd not in self.pre_execution_commands: + self.pre_execution_commands.append(install_cmd) + self.pre_execution_commands.append(copy_jar_cmd) + self.pre_execution_script = resolve_value_from_config( direct_input=pre_execution_script, config_path=REMOTE_FUNCTION_PRE_EXECUTION_SCRIPT, @@ -822,15 +840,23 @@ def _get_default_spark_image(session): py_version = str(sys.version_info[0]) + str(sys.version_info[1]) - if py_version not in ["39"]: + if py_version not in ["39", "312"]: raise ValueError( - "The SageMaker Spark image for remote job only supports Python version 3.9. " + "The SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12." ) + # Detect Spark version from installed pyspark, fall back to default + spark_version = DEFAULT_SPARK_VERSION + try: + import pyspark + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + except ImportError: + pass + image_uri = image_uris.retrieve( framework=SPARK_NAME, region=region, - version=DEFAULT_SPARK_VERSION, + version=spark_version, instance_type=None, py_version=f"py{py_version}", container_version=DEFAULT_SPARK_CONTAINER_VERSION, diff --git a/sagemaker-core/tests/unit/remote_function/test_job.py b/sagemaker-core/tests/unit/remote_function/test_job.py index db9c6ff5d0..67ebb96b51 100644 --- a/sagemaker-core/tests/unit/remote_function/test_job.py +++ b/sagemaker-core/tests/unit/remote_function/test_job.py @@ -133,7 +133,7 @@ def test_get_default_spark_image_unsupported_python_raises_error(self, mock_sess with patch.object(sys, "version_info", (3, 8, 0)): with pytest.raises( ValueError, - match="SageMaker Spark image for remote job only supports Python version 3.9", + match="SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12", ): _JobSettings._get_default_spark_image(mock_session) diff --git a/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py b/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py index 8555a79469..27b78a50bf 100644 --- a/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py +++ b/sagemaker-core/tests/unit/remote_function/test_job_comprehensive.py @@ -131,7 +131,7 @@ def test_get_default_spark_image_unsupported_python(self, mock_session): with patch.object(sys, "version_info", (3, 8, 0)): with pytest.raises( ValueError, - match="SageMaker Spark image for remote job only supports Python version 3.9", + match="SageMaker Spark image for remote job only supports Python versions 3.9 and 3.12", ): _JobSettings._get_default_spark_image(mock_session) diff --git a/sagemaker-mlops/pyproject.toml b/sagemaker-mlops/pyproject.toml index 772b41fc99..473247f468 100644 --- a/sagemaker-mlops/pyproject.toml +++ b/sagemaker-mlops/pyproject.toml @@ -35,8 +35,8 @@ dependencies = [ [project.optional-dependencies] feature-processor = [ - "pyspark==3.3.2", - "sagemaker-feature-store-pyspark-3.3", + "pyspark==3.5.1", + "sagemaker-feature-store-pyspark==2.0.0", "setuptools<82", ] @@ -45,8 +45,8 @@ test = [ "pytest-cov", "mock", "setuptools<82", - "pyspark==3.3.2", - "sagemaker-feature-store-pyspark-3.3", + "pyspark==3.5.1", + "sagemaker-feature-store-pyspark==2.0.0", "pandas<3.0", "numpy<3.0", ] diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py index d181218fb5..c1db9d19d7 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_config_uploader.py @@ -15,6 +15,8 @@ from typing import Callable, Dict, Optional, Tuple, List, Union import attr +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization as crypto_serialization from sagemaker.core.helper.session_helper import Session from sagemaker.mlops.feature_store.feature_processor._constants import ( @@ -52,13 +54,13 @@ class ConfigUploader: def prepare_step_input_channel_for_spark_mode( self, func: Callable, s3_base_uri: str, sagemaker_session: Session - ) -> Tuple[List[Channel], Dict]: + ) -> Tuple[List[Channel], Dict, str]: """Prepares input channels for SageMaker Pipeline Step. Returns: - Tuple of (List[Channel], spark_dependency_paths dict) + Tuple of (List[Channel], spark_dependency_paths dict, public_key_pem str) """ - self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session) + public_key_pem = self._prepare_and_upload_callable(func, s3_base_uri, sagemaker_session) bootstrap_scripts_s3uri = self._prepare_and_upload_runtime_scripts( self.remote_decorator_config.spark_config, s3_base_uri, @@ -139,18 +141,33 @@ def prepare_step_input_channel_for_spark_mode( SPARK_JAR_FILES_PATH: submit_jars_s3_paths, SPARK_PY_FILES_PATH: submit_py_files_s3_paths, SPARK_FILES_PATH: submit_files_s3_path, - } + }, public_key_pem def _prepare_and_upload_callable( self, func: Callable, s3_base_uri: str, sagemaker_session: Session - ) -> None: - """Prepares and uploads callable to S3""" + ) -> str: + """Prepares and uploads callable to S3. + + Returns: + str: The public key PEM string for signature verification on the remote side. + """ + private_key = ec.generate_private_key(ec.SECP256R1()) + public_key_pem = ( + private_key.public_key() + .public_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + .decode("utf-8") + ) stored_function = StoredFunction( sagemaker_session=sagemaker_session, s3_base_uri=s3_base_uri, + signing_key=private_key, s3_kms_key=self.remote_decorator_config.s3_kms_key, ) stored_function.save(func) + return public_key_pem def _prepare_and_upload_workspace( self, diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py index f5d4dd91f3..1fab16a640 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_feature_processor_config.py @@ -47,6 +47,7 @@ class FeatureProcessorConfig: parameters: Optional[Dict[str, Union[str, Dict]]] = attr.ib() enable_ingestion: bool = attr.ib() spark_config: Dict[str, str] = attr.ib() + use_lake_formation_credentials: bool = attr.ib() @staticmethod def create( @@ -59,6 +60,7 @@ def create( parameters: Optional[Dict[str, Union[str, Dict]]], enable_ingestion: bool, spark_config: Dict[str, str], + use_lake_formation_credentials: bool = False, ) -> "FeatureProcessorConfig": """Static initializer.""" return FeatureProcessorConfig( @@ -69,4 +71,5 @@ def create( parameters=parameters, enable_ingestion=enable_ingestion, spark_config=spark_config, + use_lake_formation_credentials=use_lake_formation_credentials, ) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py new file mode 100644 index 0000000000..c19bc0fdda --- /dev/null +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_image_resolver.py @@ -0,0 +1,65 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Resolves SageMaker Spark container image URIs based on installed PySpark and Python versions.""" +from __future__ import absolute_import + +import sys + +import pyspark + +from sagemaker.core import image_uris + +SPARK_IMAGE_SUPPORT_MATRIX = { + "3.1": ["py37"], + "3.2": ["py39"], + "3.3": ["py39"], + "3.5": ["py39", "py312"], +} + + +def _get_spark_image_uri(session): + """Resolve the SageMaker Spark container image URI for the installed PySpark and Python versions. + + Args: + session: SageMaker Session with boto_region_name attribute. + + Returns: + str: The ECR image URI for the matching Spark container. + + Raises: + ValueError: If the Spark/Python version combination is not supported. + """ + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + py_version = f"py{sys.version_info[0]}{sys.version_info[1]}" + + supported_py = SPARK_IMAGE_SUPPORT_MATRIX.get(spark_version) + if supported_py is None: + supported = ", ".join(sorted(SPARK_IMAGE_SUPPORT_MATRIX.keys())) + raise ValueError( + f"No SageMaker Spark container image available for Spark {spark_version}. " + f"Supported versions for remote execution: {supported}." + ) + + if py_version not in supported_py: + raise ValueError( + f"SageMaker Spark {spark_version} container images support " + f"{', '.join(supported_py)}. Current Python version: {py_version}." + ) + + return image_uris.retrieve( + framework="spark", + region=session.boto_region_name, + version=spark_version, + py_version=py_version, + container_version="v1", + ) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py index d304185e85..7b77c2d076 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_spark_factory.py @@ -13,11 +13,11 @@ """Contains factory classes for instantiating Spark objects.""" from __future__ import absolute_import +import logging from functools import lru_cache from typing import List, Tuple, Dict -import feature_store_pyspark -import feature_store_pyspark.FeatureStoreManager as fsm +import pyspark from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession @@ -26,6 +26,32 @@ SPARK_APP_NAME = "FeatureProcessor" +logger = logging.getLogger(__name__) + +SPARK_TO_HADOOP_MAP = { + "3.1": "3.2.0", + "3.2": "3.3.1", + "3.3": "3.3.2", + "3.4": "3.3.4", + "3.5": "3.3.4", +} + +_DEFAULT_HADOOP_VERSION = "3.3.4" + +def _get_hadoop_version(): + """Resolve the Hadoop version for the installed PySpark version.""" + spark_version = pyspark.__version__ + major_minor = ".".join(spark_version.split(".")[:2]) + hadoop_version = SPARK_TO_HADOOP_MAP.get(major_minor) + if hadoop_version is None: + hadoop_version = _DEFAULT_HADOOP_VERSION + logger.warning( + "Unknown Spark version %s. Falling back to Hadoop %s.", + spark_version, + hadoop_version, + ) + return hadoop_version + class SparkSessionFactory: """Lazy loading, memoizing, instantiation of SparkSessions. @@ -55,6 +81,10 @@ def spark_session(self) -> SparkSession: is_training_job = self.environment_helper.is_training_job() instance_count = self.environment_helper.get_instance_count() + # Copy version-matched Feature Store JAR to Spark's system classpath + # so it's available to the JVM even if SparkContext is already running. + self._install_feature_store_jars() + spark_configs = self._get_spark_configs(is_training_job) spark_conf = SparkConf().setAll(spark_configs).setAppName(SPARK_APP_NAME) @@ -69,6 +99,24 @@ def spark_session(self) -> SparkSession: return SparkSession(sparkContext=sc) + @staticmethod + def _install_feature_store_jars(): + """Copy the Spark-version-matched Feature Store JAR to Spark's system classpath.""" + import feature_store_pyspark + import shutil + import os + + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + target_dir = "/usr/lib/spark/jars" + if not os.path.isdir(target_dir): + return + for jar in feature_store_pyspark.classpath_jars(): + if spark_version in os.path.basename(jar): + dest = os.path.join(target_dir, os.path.basename(jar)) + if not os.path.exists(dest): + shutil.copy(jar, dest) + logger.info("Copied %s to %s", jar, target_dir) + def _get_spark_configs(self, is_training_job) -> List[Tuple[str, str]]: """Generate Spark Configurations optimized for feature_processing functionality. @@ -115,28 +163,37 @@ def _get_spark_configs(self, is_training_job) -> List[Tuple[str, str]]: spark_configs.extend(self.spark_config.items()) if not is_training_job: - fp_spark_jars = feature_store_pyspark.classpath_jars() + hadoop_version = _get_hadoop_version() fp_spark_packages = [ - "org.apache.hadoop:hadoop-aws:3.3.1", - "org.apache.hadoop:hadoop-common:3.3.1", + f"org.apache.hadoop:hadoop-aws:{hadoop_version}", + f"org.apache.hadoop:hadoop-common:{hadoop_version}", ] - if self.spark_config and "spark.jars" in self.spark_config: - fp_spark_jars.append(self.spark_config.get("spark.jars")) - if self.spark_config and "spark.jars.packages" in self.spark_config: fp_spark_packages.append(self.spark_config.get("spark.jars.packages")) - spark_configs.extend( - ( - ("spark.jars", ",".join(fp_spark_jars)), - ( - "spark.jars.packages", - ",".join(fp_spark_packages), - ), - ) + spark_configs.append( + ("spark.jars.packages", ",".join(fp_spark_packages)) ) + # Always add Feature Store JARs so they are on the classpath + # regardless of whether we are in a training job or not. + import feature_store_pyspark + import os + + spark_version = ".".join(pyspark.__version__.split(".")[:2]) + fp_spark_jars = [ + j for j in feature_store_pyspark.classpath_jars() + if spark_version in os.path.basename(j) + ] + if not fp_spark_jars: + fp_spark_jars = feature_store_pyspark.classpath_jars() + + if self.spark_config and "spark.jars" in self.spark_config: + fp_spark_jars.append(self.spark_config.get("spark.jars")) + + spark_configs.append(("spark.jars", ",".join(fp_spark_jars))) + return spark_configs def _get_jsc_hadoop_configs(self) -> List[Tuple[str, str]]: @@ -197,6 +254,8 @@ class FeatureStoreManagerFactory: @property @lru_cache() - def feature_store_manager(self) -> fsm.FeatureStoreManager: + def feature_store_manager(self) -> "fsm.FeatureStoreManager": """Instansiate a new FeatureStoreManager.""" + import feature_store_pyspark.FeatureStoreManager as fsm + return fsm.FeatureStoreManager() diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py index a037e837c2..08fc280f46 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/_udf_output_receiver.py @@ -81,6 +81,7 @@ def ingest_udf_output(self, output: DataFrame, fp_config: FeatureProcessorConfig input_data_frame=output, feature_group_arn=fp_config.output, target_stores=fp_config.target_stores, + use_lake_formation_credentials=fp_config.use_lake_formation_credentials ) except Py4JJavaError as e: if e.java_exception.getClass().getSimpleName() == "StreamIngestionFailureException": diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py index 31593a3f1c..214c49109a 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_processor.py @@ -40,6 +40,7 @@ def feature_processor( parameters: Optional[Dict[str, Union[str, Dict]]] = None, enable_ingestion: bool = True, spark_config: Dict[str, str] = None, + use_lake_formation_credentials: bool = False, ) -> Callable: """Decorator to facilitate feature engineering for Feature Groups. @@ -96,6 +97,8 @@ def transform(input_feature_group, input_csv): development phase to ensure that data is not used until the function is ready. It also useful for users that want to manage their own data ingestion. Defaults to True. spark_config (Dict[str, str]): A dict contains the key-value paris for Spark configurations. + use_lake_formation_credentials (bool, optional): Whether to use Lake Formation credential + vending for data ingestion. Defaults to False. Raises: IngestionError: If any rows are not ingested successfully then a sample of the records, @@ -114,6 +117,7 @@ def decorator(udf: Callable[..., Any]) -> Callable: parameters=parameters, enable_ingestion=enable_ingestion, spark_config=spark_config, + use_lake_formation_credentials=use_lake_formation_credentials, ) validator_chain = ValidatorFactory.get_validation_chain(fp_config) diff --git a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py index c18232cb08..e0c57a9afb 100644 --- a/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py +++ b/sagemaker-mlops/src/sagemaker/mlops/feature_store/feature_processor/feature_scheduler.py @@ -98,6 +98,7 @@ FeatureProcessorLineageHandler, TransformationCode, ) +from sagemaker.mlops.feature_store.feature_processor._image_resolver import _get_spark_image_uri from sagemaker.core.remote_function.job import ( _JobSettings, @@ -174,6 +175,7 @@ def to_pipeline( ( input_data_config, spark_dependency_paths, + public_key_pem, ) = config_uploader.prepare_step_input_channel_for_spark_mode( func=getattr(step, "wrapped_func", step), s3_base_uri=s3_base_uri, @@ -194,6 +196,7 @@ def to_pipeline( spark_dependency_paths=spark_dependency_paths, pipeline_session=pipeline_session, role=_role, + public_key_pem=public_key_pem, ) step_args = model_trainer.train(input_data_config=input_data_config) @@ -846,6 +849,7 @@ def _prepare_model_trainer_from_remote_decorator_config( spark_dependency_paths: Dict[str, Optional[str]], pipeline_session: PipelineSession, role: str, + public_key_pem: str = None, ) -> ModelTrainer: """Prepares a ModelTrainer instance from remote decorator configuration. @@ -866,6 +870,8 @@ def _prepare_model_trainer_from_remote_decorator_config( # Build environment dict from remote_decorator_config environment = dict(remote_decorator_config.environment_variables or {}) + if public_key_pem: + environment["REMOTE_FUNCTION_SECRET_KEY"] = public_key_pem # Build command from container entry point and arguments entry_point_and_args = _get_container_entry_point_and_arguments( @@ -1039,7 +1045,8 @@ def _get_remote_decorator_config_from_input( # TODO: This needs to be removed when new mode is introduced. if remote_decorator_config.spark_config is None: remote_decorator_config.spark_config = SparkConfig() - remote_decorator_config.image_uri = _JobSettings._get_default_spark_image(sagemaker_session) + if not remote_decorator_config.image_uri: + remote_decorator_config.image_uri = _get_spark_image_uri(sagemaker_session) return remote_decorator_config diff --git a/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt b/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt index 2940fa2876..7d1e1f6239 100644 --- a/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt +++ b/sagemaker-mlops/tests/data/feature_store/feature_processor/requirements.txt @@ -1 +1,2 @@ -# unrelased sagemaker is installed via pre_execution_commands \ No newline at end of file +# unrelased sagemaker is installed via pre_execution_commands +sagemaker-feature-store-pyspark>=2,<3 \ No newline at end of file diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py index 6994971f22..f3b0864f1a 100644 --- a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_integ.py @@ -40,7 +40,9 @@ from sagemaker.mlops.feature_store import ( FeatureGroup, FeatureDefinition, + FeatureGroupManager, FeatureTypeEnum, + LakeFormationConfig, OnlineStoreConfig, OfflineStoreConfig, S3StorageConfig, @@ -526,11 +528,11 @@ def transform(raw_s3_data_as_df): ) -@pytest.mark.slow_test @pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", ) +@pytest.mark.slow_test def test_feature_processor_transform_offline_only_store_ingestion_run_with_remote( sagemaker_session, pre_execution_commands, @@ -666,11 +668,11 @@ def transform(raw_s3_data_as_df): ) -@pytest.mark.slow_test @pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", ) +@pytest.mark.slow_test def test_to_pipeline_and_execute( sagemaker_session, pre_execution_commands, @@ -789,11 +791,159 @@ def transform(raw_s3_data_as_df): # cleanup_pipeline(pipeline_name="pipeline-name-01", sagemaker_session=sagemaker_session) +@pytest.mark.skipif( + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", +) @pytest.mark.slow_test +def test_to_pipeline_and_execute_with_lake_formation( + sagemaker_session, + pre_execution_commands, + dependencies_path, +): + pipeline_name = "pipeline-name-lf-01" + car_data_feature_group_name = get_car_data_feature_group_name() + car_data_fg = None + try: + offline_store_s3_uri = get_offline_store_s3_uri(sagemaker_session=sagemaker_session) + role_arn = get_execution_role(sagemaker_session) + + lake_formation_config = LakeFormationConfig( + enabled=True, + hybrid_access_mode_enabled=False, + use_service_linked_role=False, + registration_role_arn=role_arn, + acknowledge_risk=True, + ) + car_data_fg = FeatureGroupManager.create( + feature_group_name=car_data_feature_group_name, + record_identifier_feature_name="id", + event_time_feature_name="ingest_time", + feature_definitions=CAR_SALES_FG_FEATURE_DEFINITIONS, + offline_store_config=OfflineStoreConfig( + s3_storage_config=S3StorageConfig(s3_uri=f"{offline_store_s3_uri}/car-data") + ), + online_store_config=OnlineStoreConfig(enable_online_store=True), + role_arn=role_arn, + lake_formation_config=lake_formation_config, + ) + car_data_arn = car_data_fg.feature_group_arn + + raw_data_uri = get_raw_car_data_s3_uri(sagemaker_session=sagemaker_session) + + @remote( + pre_execution_commands=pre_execution_commands, + dependencies=dependencies_path, + spark_config=SparkConfig(), + instance_type="ml.m5.xlarge", + ) + @feature_processor( + inputs=[CSVDataSource(raw_data_uri)], + output=car_data_arn, + target_stores=["OfflineStore"], + use_lake_formation_credentials=True, + ) + def transform(raw_s3_data_as_df): + """Load data from S3, perform basic feature engineering, store it in a Feature Group""" + from pyspark.sql.functions import regexp_replace + from pyspark.sql.functions import lit + + transformed_df = ( + raw_s3_data_as_df + # Rename Columns + .withColumnRenamed("Id", "id") + .withColumnRenamed("Model", "model") + .withColumnRenamed("Year", "model_year") + .withColumnRenamed("Status", "status") + .withColumnRenamed("Mileage", "mileage") + .withColumnRenamed("Price", "price") + .withColumnRenamed("MSRP", "msrp") + # Add Event Time + .withColumn("ingest_time", lit(int(time.time()))) + # Remove punctuation and fluff; replace with NA + .withColumn("Price", regexp_replace("Price", "\$", "")) # noqa: W605 + .withColumn("mileage", regexp_replace("mileage", "(,)|(mi\.)", "")) # noqa: W605 + .withColumn("mileage", regexp_replace("mileage", "Not available", "NA")) + .withColumn("price", regexp_replace("price", ",", "")) + .withColumn("msrp", regexp_replace("msrp", "(^MSRP\s\\$)|(,)", "")) # noqa: W605 + .withColumn("msrp", regexp_replace("msrp", "Not specified", "NA")) + .withColumn("msrp", regexp_replace("msrp", "\\$\d+[a-zA-Z\s]+", "NA")) # noqa: W605 + .withColumn("model", regexp_replace("model", "^\d\d\d\d\s", "")) # noqa: W605 + ) + + transformed_df.show() + return transformed_df + + _wait_for_feature_group_lineage_contexts( + car_data_feature_group_name, sagemaker_session + ) + + pipeline_arn = to_pipeline( + pipeline_name=pipeline_name, + step=transform, + role_arn=role_arn, + max_retries=2, + tags=[("integ_test_tag_key_1", "integ_test_tag_key_2")], + sagemaker_session=sagemaker_session, + ) + _sagemaker_client = get_sagemaker_client(sagemaker_session=sagemaker_session) + + assert pipeline_arn is not None + + tags = _sagemaker_client.list_tags(ResourceArn=pipeline_arn)["Tags"] + + tag_keys = [tag["Key"] for tag in tags] + assert "integ_test_tag_key_1" in tag_keys + + pipeline_description = Pipeline(name=pipeline_name).describe() + assert pipeline_arn == pipeline_description["PipelineArn"] + assert role_arn == pipeline_description["RoleArn"] + + pipeline_definition = json.loads(pipeline_description["PipelineDefinition"]) + assert len(pipeline_definition["Steps"]) == 1 + for retry_policy in pipeline_definition["Steps"][0]["RetryPolicies"]: + assert retry_policy["MaxAttempts"] == 2 + + pipeline_execution_arn = execute( + pipeline_name=pipeline_name, sagemaker_session=sagemaker_session + ) + + status = _wait_for_pipeline_execution_to_reach_terminal_state( + pipeline_execution_arn=pipeline_execution_arn, + sagemaker_client=_sagemaker_client, + ) + assert status == "Succeeded" + + finally: + if car_data_fg is not None: + cleanup_offline_store( + feature_group=car_data_fg, + sagemaker_session=sagemaker_session, + ) + try: + car_data_fg.refresh() + data_catalog_config = car_data_fg.offline_store_config.data_catalog_config + boto3_session = sagemaker_session.boto_session + glue_client = boto3_session.client( + "glue", region_name=sagemaker_session.boto_region_name + ) + glue_client.delete_table( + DatabaseName=data_catalog_config.database, + Name=data_catalog_config.table_name, + ) + logging.info( + f"Deleted Glue table {data_catalog_config.database}.{data_catalog_config.table_name}" + ) + except Exception as e: + logging.warning(f"Failed to delete Glue table: {e}") + cleanup_feature_group(car_data_fg, sagemaker_session=sagemaker_session) + + @pytest.mark.skipif( - not sys.version.startswith("3.9"), - reason="Only allow this test to run with py39", + sys.version_info[:2] not in [(3, 9), (3, 12)], + reason=f"SageMaker Spark image only supports Python 3.9 and 3.12, got {sys.version_info[:2]}", ) +@pytest.mark.slow_test def test_schedule_and_event_trigger( sagemaker_session, pre_execution_commands, @@ -1076,14 +1226,19 @@ def get_pre_execution_commands(sagemaker_session): """Build SDK wheels, upload to S3, and return pre-execution install commands.""" s3_prefix, wheel_names = get_wheel_file_s3_uri(sagemaker_session=sagemaker_session) sagemaker_whl, core_whl, mlops_whl = wheel_names - print(f'{sagemaker_whl=}, {core_whl=}, {mlops_whl}') - return [ - f"aws s3 cp {s3_prefix}/ /tmp/packages/ --recursive", - "pip3 install 'setuptools<75'", - f"pip3 install --no-build-isolation '/tmp/packages/{sagemaker_whl}[feature-processor]' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", - f"pip3 install --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", - f"pip3 install --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", + print(f'{sagemaker_whl=}, {core_whl=}, {mlops_whl=}') + PIP = "python3 -m pip install --root-user-action=ignore" + AWS = "python3 -m awscli" + cmds = [ + f"{PIP} awscli", + f"{AWS} s3 cp {s3_prefix}/ /tmp/packages/ --recursive", + f"{PIP} 'setuptools<75'", + f"{PIP} --no-build-isolation '/tmp/packages/{mlops_whl}' 'numpy<2.0.0' 'ml_dtypes<=0.4.1' 'setuptools<75' || true", + f"{PIP} --no-deps --force-reinstall /tmp/packages/{sagemaker_whl}", + f"{PIP} --no-deps --force-reinstall /tmp/packages/{core_whl} /tmp/packages/{mlops_whl}", ] + print(cmds) + return cmds def create_feature_groups( diff --git a/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py new file mode 100644 index 0000000000..12d798a00e --- /dev/null +++ b/sagemaker-mlops/tests/integ/feature_store/feature_processor/test_feature_processor_spark_compat.py @@ -0,0 +1,76 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +"""Integration tests for Spark multi-version compatibility.""" +from __future__ import absolute_import + +import pyspark +import pytest +from mock import Mock + +from sagemaker.mlops.feature_store.feature_processor._spark_factory import ( + SparkSessionFactory, + SPARK_TO_HADOOP_MAP, + _get_hadoop_version, +) +from sagemaker.mlops.feature_store.feature_processor._image_resolver import ( + SPARK_IMAGE_SUPPORT_MATRIX, + _get_spark_image_uri, +) + + +@pytest.mark.slow_test +def test_hadoop_version_resolves_for_installed_pyspark(): + """Verify that the installed PySpark version resolves to a known Hadoop version.""" + hadoop_version = _get_hadoop_version() + spark_major_minor = ".".join(pyspark.__version__.split(".")[:2]) + + if spark_major_minor in SPARK_TO_HADOOP_MAP: + assert hadoop_version == SPARK_TO_HADOOP_MAP[spark_major_minor] + else: + # Unknown version falls back to latest + assert hadoop_version == "3.3.4" + + +@pytest.mark.slow_test +def test_spark_session_factory_configs_include_dynamic_hadoop(): + """Verify SparkSessionFactory produces configs with the correct Hadoop Maven coordinates.""" + env_helper = Mock() + factory = SparkSessionFactory(env_helper) + configs = dict(factory._get_spark_configs(is_training_job=False)) + + hadoop_version = _get_hadoop_version() + packages = configs.get("spark.jars.packages", "") + assert f"org.apache.hadoop:hadoop-aws:{hadoop_version}" in packages + assert f"org.apache.hadoop:hadoop-common:{hadoop_version}" in packages + + +@pytest.mark.slow_test +def test_image_resolver_returns_uri_for_installed_pyspark(): + """Verify the image resolver returns a valid URI for the installed PySpark + Python version.""" + import sys + + spark_major_minor = ".".join(pyspark.__version__.split(".")[:2]) + py_version = f"py{sys.version_info[0]}{sys.version_info[1]}" + + supported_py = SPARK_IMAGE_SUPPORT_MATRIX.get(spark_major_minor) + if supported_py is None or py_version not in supported_py: + pytest.skip( + f"Spark {spark_major_minor} + {py_version} not in support matrix; " + f"skipping image resolver test" + ) + + session = Mock(boto_region_name="us-west-2") + image_uri = _get_spark_image_uri(session) + + assert "sagemaker-spark-processing" in image_uri + assert spark_major_minor in image_uri diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py index 25ded72266..b181378683 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_config_uploader.py @@ -238,6 +238,7 @@ def test_prepare_step_input_channel( ( input_data_config, spark_dependency_paths, + _public_key_pem, ) = config_uploader.prepare_step_input_channel_for_spark_mode( wrapped_func, config_uploader.remote_decorator_config.s3_root_uri, diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py index 9cd5655de5..8d62df3716 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_feature_scheduler.py @@ -162,6 +162,10 @@ def config_uploader(): return uploader +@patch( + "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) @patch( "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._validate_fg_lineage_resources", return_value=None, @@ -171,7 +175,7 @@ def config_uploader(): return_value=mock_pipeline(), ) @patch( - "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.mlops.feature_store.feature_processor._config_uploader.TrainingInput") @@ -298,7 +302,14 @@ def test_to_pipeline( mock_dependency_upload.assert_called_once_with( "/tmp/snapshot", True, - None, + [ + "pip install --root-user-action=ignore 'sagemaker-feature-store-pyspark>=2,<3'", + ( + "python3 -c \"import feature_store_pyspark, shutil; " + "[shutil.copy(j, '/usr/lib/spark/jars/') " + "for j in feature_store_pyspark.classpath_jars()]\"" + ), + ], None, f"{S3_URI}/pipeline_name", None, @@ -470,9 +481,13 @@ def test_to_pipeline_not_wrapped_by_remote(get_execution_role, session): ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) @@ -522,9 +537,13 @@ def test_to_pipeline_wrong_mode(get_execution_role, mock_spark_image, session): ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) @@ -577,9 +596,13 @@ def test_to_pipeline_pipeline_name_length_limit_exceeds( ) -@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) @patch( "sagemaker.core.remote_function.job._JobSettings._get_default_spark_image", + new=Mock(return_value="some_image_uri"), +) +@patch("sagemaker.core.remote_function.job.Session", return_value=mock_session()) +@patch( + "sagemaker.mlops.feature_store.feature_processor.feature_scheduler._get_spark_image_uri", return_value="some_image_uri", ) @patch("sagemaker.core.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py new file mode 100644 index 0000000000..965dc2c8a4 --- /dev/null +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_image_resolver.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import sys + +import pyspark +import pytest +from mock import Mock, patch + +from sagemaker.mlops.feature_store.feature_processor._image_resolver import _get_spark_image_uri + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_33_py39(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.3-cpu-py39-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.3.2"), \ + patch.object(sys, "version_info", (3, 9, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.3", + py_version="py39", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_35_py39(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py39-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 9, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.5", + py_version="py39", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +@patch("sagemaker.mlops.feature_store.feature_processor._image_resolver.image_uris.retrieve") +def test_spark_35_py312(mock_retrieve): + mock_retrieve.return_value = "123456.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py312-v1" + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 12, 0)): + result = _get_spark_image_uri(session) + mock_retrieve.assert_called_once_with( + framework="spark", + region="us-west-2", + version="3.5", + py_version="py312", + container_version="v1", + ) + assert result == mock_retrieve.return_value + + +def test_spark_34_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.4.1"), \ + patch.object(sys, "version_info", (3, 9, 0)): + with pytest.raises(ValueError, match="No SageMaker Spark container image available for Spark 3.4"): + _get_spark_image_uri(session) + + +def test_spark_35_py310_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.5.1"), \ + patch.object(sys, "version_info", (3, 10, 0)): + with pytest.raises(ValueError, match="SageMaker Spark 3.5 container images support"): + _get_spark_image_uri(session) + + +def test_spark_33_py312_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.3.2"), \ + patch.object(sys, "version_info", (3, 12, 0)): + with pytest.raises(ValueError, match="SageMaker Spark 3.3 container images support"): + _get_spark_image_uri(session) + + +def test_unknown_spark_version_raises(): + session = Mock(boto_region_name="us-west-2") + with patch.object(pyspark, "__version__", "3.6.0"), \ + patch.object(sys, "version_info", (3, 9, 0)): + with pytest.raises(ValueError, match="No SageMaker Spark container image available for Spark 3.6"): + _get_spark_image_uri(session) diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py index 09fade48ed..1e0ab6b4c5 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_spark_session_factory.py @@ -14,6 +14,7 @@ from __future__ import absolute_import import feature_store_pyspark +import pyspark import pytest from mock import Mock, patch, call @@ -72,10 +73,12 @@ def test_spark_session_factory_configuration(): # Verify configurations when not running on a training job assert ",".join(feature_store_pyspark.classpath_jars()) in spark_configs.get("spark.jars") + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + hadoop_version = _get_hadoop_version() assert ",".join( [ - "org.apache.hadoop:hadoop-aws:3.3.1", - "org.apache.hadoop:hadoop-common:3.3.1", + f"org.apache.hadoop:hadoop-aws:{hadoop_version}", + f"org.apache.hadoop:hadoop-common:{hadoop_version}", ] ) in spark_configs.get("spark.jars.packages") @@ -88,9 +91,11 @@ def test_spark_session_factory_configuration_on_training_job(): spark_config = spark_session_factory._get_spark_configs(is_training_job=True) assert dict(spark_config).get("spark.test.key") == "spark.test.value" - assert all(tup[0] != "spark.jars" for tup in spark_config) assert all(tup[0] != "spark.jars.packages" for tup in spark_config) + # spark.jars should always be present (Feature Store JARs are always on the classpath) + assert ",".join(feature_store_pyspark.classpath_jars()) in dict(spark_config).get("spark.jars") + @patch("pyspark.context.SparkContext.getOrCreate") def test_spark_session_factory(mock_spark_context): @@ -173,3 +178,34 @@ def test_spark_session_factory_get_spark_session_with_iceberg_config(env_helper) == "smfs.shaded.org.apache.iceberg.aws.s3.S3FileIO" ) assert iceberg_configs.get("spark.sql.catalog.catalog.glue.skip-name-validation") == "true" + + +@pytest.mark.parametrize( + "spark_version,expected_hadoop", + [ + ("3.1.3", "3.2.0"), + ("3.2.2", "3.3.1"), + ("3.3.2", "3.3.2"), + ("3.4.1", "3.3.4"), + ("3.5.1", "3.3.4"), + ], +) +def test_get_hadoop_version(spark_version, expected_hadoop): + with patch.object(pyspark, "__version__", spark_version): + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + assert _get_hadoop_version() == expected_hadoop + + +def test_get_hadoop_version_unknown_falls_back(): + with patch.object(pyspark, "__version__", "3.6.0"): + from sagemaker.mlops.feature_store.feature_processor._spark_factory import _get_hadoop_version + assert _get_hadoop_version() == "3.3.4" + + +def test_spark_configs_use_dynamic_hadoop_version(): + with patch.object(pyspark, "__version__", "3.5.1"): + env_helper = Mock() + factory = SparkSessionFactory(env_helper) + configs = dict(factory._get_spark_configs(is_training_job=False)) + assert "org.apache.hadoop:hadoop-aws:3.3.4" in configs.get("spark.jars.packages") + assert "org.apache.hadoop:hadoop-common:3.3.4" in configs.get("spark.jars.packages") diff --git a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py index 34770a011a..535bec725a 100644 --- a/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py +++ b/sagemaker-mlops/tests/unit/sagemaker/mlops/feature_store/feature_processor/test_udf_output_receiver.py @@ -64,6 +64,7 @@ def test_ingest_udf_output(df, feature_store_manager, spark_output_receiver): input_data_frame=df, feature_group_arn=fp_config.output, target_stores=fp_config.target_stores, + use_lake_formation_credentials=fp_config.use_lake_formation_credentials, )