diff --git a/.gitignore b/.gitignore index 9b34b7af..22f9e044 100644 --- a/.gitignore +++ b/.gitignore @@ -191,4 +191,11 @@ Thumbs.db # Milvus **/volumes/ -**/rag_storage/ \ No newline at end of file +**/rag_storage/ + +# AI tools/agent +.agents/ +.claude/ +data/ +openspec/ +devspace.yaml diff --git a/runtime/datamate-python/app/core/security.py b/runtime/datamate-python/app/core/security.py new file mode 100644 index 00000000..3b8c0618 --- /dev/null +++ b/runtime/datamate-python/app/core/security.py @@ -0,0 +1,159 @@ +"""Security utilities for handling sensitive data""" +import re +from typing import Any, Dict + + +SENSITIVE_FIELDS = [ + "secretKey", + "accessKey", + "password", + "passwd", + "pwd", + "secret", + "token", + "apiKey", + "api_key", + "access_key", + "secret_key", +] + +MASK_PATTERN = "**********" + + +def mask_sensitive_value(value: str) -> str: + """Mask a single sensitive value""" + return MASK_PATTERN + + +def is_masked_value(value: str) -> bool: + """Check if a value is already masked""" + return value == MASK_PATTERN + + +def preserve_sensitive_values( + new_config: Dict[str, Any], + original_config: Dict[str, Any] +) -> Dict[str, Any]: + """ + Preserve original sensitive values if masked pattern is detected in update request. + + When frontend receives masked values and sends them back in update request, + this function detects masked values and replaces them with original values + from database. + + Args: + new_config: Config from update request (may contain masked values) + original_config: Original config from database (contains real values) + + Returns: + Config with original sensitive values preserved + """ + if not isinstance(new_config, dict) or not isinstance(original_config, dict): + return new_config + + preserved_config = {} + for key, new_value in new_config.items(): + # Check if key is a sensitive field + key_lower = key.lower() + is_sensitive = any( + field.lower() == key_lower or field.lower() in key_lower + for field in SENSITIVE_FIELDS + ) + + # If value is masked and field is sensitive, use original value + if is_sensitive and isinstance(new_value, str) and is_masked_value(new_value): + original_value = original_config.get(key) + preserved_config[key] = original_value if original_value else new_value + elif isinstance(new_value, dict): + # Recursively process nested dictionaries + original_nested = original_config.get(key, {}) + preserved_config[key] = preserve_sensitive_values(new_value, original_nested) + elif isinstance(new_value, list): + original_list = original_config.get(key, []) + preserved_list = [] + for i, new_item in enumerate(new_value): + if i < len(original_list): + orig_item = original_list[i] + if isinstance(new_item, dict) and isinstance(orig_item, dict): + preserved_list.append(preserve_sensitive_values(new_item, orig_item)) + else: + preserved_list.append(new_item) + else: + preserved_list.append(new_item) + preserved_config[key] = preserved_list + else: + # Keep non-sensitive/non-masked values + preserved_config[key] = new_value + + return preserved_config + + +def mask_sensitive_dict(data: Dict[str, Any]) -> Dict[str, Any]: + """ + Recursively mask sensitive fields in a dictionary. + + Args: + data: Dictionary that may contain sensitive fields + + Returns: + Dictionary with sensitive values masked + """ + if not isinstance(data, dict): + return data + + masked_data = {} + for key, value in data.items(): + # Check if the key is a sensitive field (case-insensitive) + key_lower = key.lower() + is_sensitive = any( + field.lower() == key_lower or field.lower() in key_lower + for field in SENSITIVE_FIELDS + ) + + if is_sensitive and isinstance(value, str): + # Mask the sensitive value + masked_data[key] = MASK_PATTERN + elif isinstance(value, dict): + # Recursively mask nested dictionaries + masked_data[key] = mask_sensitive_dict(value) + elif isinstance(value, list): + # Process list items + masked_data[key] = [ + mask_sensitive_dict(item) if isinstance(item, dict) else item + for item in value + ] + else: + # Keep non-sensitive values as-is + masked_data[key] = value + + return masked_data + + +def mask_sensitive_info(text: str) -> str: + """ + Mask sensitive information in text by replacing values with ********** + + Args: + text: Original text that may contain sensitive information + + Returns: + Text with sensitive values masked + """ + masked_text = text + + for field in SENSITIVE_FIELDS: + patterns = [ + rf'"{field}"\s*:\s*"[^"]*"', + rf'{field}\s*=\s*[^\s,\]]+', + rf"'{field}'\s*:\s*'[^']*'", + ] + + for pattern in patterns: + if '"' in pattern: + masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text, flags=re.IGNORECASE) + elif "'" in pattern: + masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text, flags=re.IGNORECASE) + else: + masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text, flags=re.IGNORECASE) + + return masked_text \ No newline at end of file diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py index 254f704b..85a6dc28 100644 --- a/runtime/datamate-python/app/module/collection/client/datax_client.py +++ b/runtime/datamate-python/app/module/collection/client/datax_client.py @@ -6,6 +6,7 @@ from typing import Dict, Any from app.core.logging import get_logger +from app.core.security import mask_sensitive_info from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.module.collection.schema.collection import CollectionConfig, SyncMode from app.module.shared.schema import TaskStatus @@ -118,25 +119,19 @@ def run_datax_job(self): Returns: 执行结果字典 """ - # 创建配置文件 - self.create__config_file() + if not self.execution.started_at: + self.execution.started_at = datetime.now() + try: - # 构建命令 + self.create__config_file() cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)] cmd_str = ' '.join(cmd) logger.info(f"执行命令: {cmd_str}") - if not self.execution.started_at: - self.execution.started_at = datetime.now() - # 执行命令并写入日志 with open(self.execution.log_path, 'w', encoding='utf-8') as log_f: - # 写入头信息 self.write_header_log(cmd_str, log_f) - # 启动datax进程 exit_code = self._run_process(cmd, log_f) - # 记录结束时间 self.execution.completed_at = datetime.now() self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() - # 写入结束信息 self.write_tail_log(exit_code, log_f) if exit_code == 0: logger.info(f"DataX 任务执行成功: {self.execution.id}") @@ -149,17 +144,17 @@ def run_datax_job(self): logger.error(self.execution.error_message) except Exception as e: self.execution.completed_at = datetime.now() - self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() + if self.execution.started_at: + self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() self.execution.error_message = f"执行异常: {e}" self.execution.status = TaskStatus.FAILED.name logger.error(f"执行异常: {e}", exc_info=True) + with open(self.execution.log_path, 'w', encoding='utf-8') as log_f: + log_f.write(f"任务执行失败: {e}\n") - # 根据同步模式更新任务状态 if self.task.sync_mode == SyncMode.ONCE: - # 一次性任务:使用执行结果作为最终状态 self.task.status = self.execution.status else: - # 定时任务:恢复为 PENDING 状态,等待下次执行 self.task.status = TaskStatus.PENDING.name def rename_collection_result(self): @@ -229,10 +224,12 @@ def write_header_log(self, cmd: str, log_f): @staticmethod def read_stream(stream, log_f): - """读取输出流""" + """读取输出流并屏蔽敏感信息""" for line in stream: line = line.rstrip('\n') if line: + # Mask sensitive information before writing to log + masked_line = mask_sensitive_info(line) # 写入日志文件 - log_f.write(f"{line}\n") + log_f.write(f"{masked_line}\n") log_f.flush() diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py index eebeafc7..0d1cbef9 100644 --- a/runtime/datamate-python/app/module/collection/interface/collection.py +++ b/runtime/datamate-python/app/module/collection/interface/collection.py @@ -12,11 +12,12 @@ from app.core.exception import ErrorCodes, BusinessError, SuccessResponse, transaction from app.core.logging import get_logger +from app.core.security import preserve_sensitive_values from app.db.models import Dataset, DatasetFiles from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.db.session import get_db from app.module.collection.client.datax_client import DataxClient -from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, converter_to_response, \ +from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, CollectionConfig, converter_to_response, \ convert_for_create, SyncMode from app.module.collection.schedule import schedule_collection_task, remove_collection_task from app.module.collection.service.collection import CollectionTaskService @@ -300,12 +301,28 @@ async def update_task( reschedule_collection_task(task_id, task.schedule_expression) if 'config' in update_data: - # 重新生成任务配置文件 + # Get original config from database + original_config = json.loads(task.config) if task.config else {} + + # Preserve sensitive values if masked pattern detected + preserved_config = preserve_sensitive_values( + request.config.dict(), + original_config + ) + + # Regenerate task config file with preserved sensitive values template = await db.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id)) template = template.scalar_one_or_none() if template: - DataxClient.generate_datx_config(request.config, template, task.target_path) - task.config = json.dumps(request.config.dict()) + # Use preserved config to generate DataX config + config_obj = CollectionConfig(**preserved_config) + DataxClient.generate_datx_config( + config_obj, + template, + task.target_path + ) + # Save the modified config (with regenerated job) to database + task.config = json.dumps(config_obj.model_dump()) # 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行 if task.status == TaskStatus.FAILED.name: diff --git a/runtime/datamate-python/app/module/collection/schema/collection.py b/runtime/datamate-python/app/module/collection/schema/collection.py index c8608cba..5ae62128 100644 --- a/runtime/datamate-python/app/module/collection/schema/collection.py +++ b/runtime/datamate-python/app/module/collection/schema/collection.py @@ -2,11 +2,12 @@ import uuid from datetime import datetime from enum import Enum -from typing import Optional +from typing import Optional, Dict, Any -from pydantic import BaseModel, Field, validator, ConfigDict, field_validator +from pydantic import BaseModel, Field, validator, ConfigDict, field_validator, field_serializer from pydantic.alias_generators import to_camel +from app.core.security import mask_sensitive_dict from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.module.dataset.schema import DatasetTypeResponse from app.module.dataset.schema.dataset import DatasetType @@ -23,6 +24,13 @@ class CollectionConfig(BaseModel): writer: Optional[dict] = Field(None, description="writer参数") job: Optional[dict] = Field(None, description="任务配置") + @field_serializer('parameter', 'reader', 'writer', 'job', when_used='json') + def mask_sensitive_fields(self, value: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """Mask sensitive fields when serializing to JSON""" + if value is None: + return value + return mask_sensitive_dict(value) + class CollectionTaskBase(BaseModel): id: str = Field(..., description="任务id") name: str = Field(..., description="任务名称") @@ -93,6 +101,7 @@ def validate_timeout(cls, v): ) def converter_to_response(task: CollectionTask) -> CollectionTaskBase: + config_dict = json.loads(task.config) return CollectionTaskBase( id=task.id, name=task.name, @@ -101,7 +110,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase: template_id=task.template_id, template_name=task.template_name, target_path=task.target_path, - config=json.loads(task.config), + config=CollectionConfig(**config_dict), schedule_expression=task.schedule_expression, status=task.status, retry_count=task.retry_count,