From 93b659f700077d5de7464dd444d91d4dc9d71b98 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Mon, 18 May 2026 20:27:28 +0800 Subject: [PATCH 1/3] :lock: hide privacy message in task log --- .../module/collection/client/datax_client.py | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py index 254f704b..ed86431c 100644 --- a/runtime/datamate-python/app/module/collection/client/datax_client.py +++ b/runtime/datamate-python/app/module/collection/client/datax_client.py @@ -1,4 +1,5 @@ import json +import re import threading import subprocess from datetime import datetime @@ -12,6 +13,56 @@ logger = get_logger(__name__) +# Sensitive fields that need to be masked in logs +SENSITIVE_FIELDS = [ + "secretKey", + "accessKey", + "password", + "passwd", + "pwd", + "secret", + "token", + "apiKey", + "api_key", +] + +MASK_PATTERN = "**********" + +def mask_sensitive_info(text: str) -> str: + """ + Mask sensitive information in text by replacing values with ********** + + Args: + text: Original text that may contain sensitive information + + Returns: + Text with sensitive values masked + """ + masked_text = text + + for field in SENSITIVE_FIELDS: + # Match patterns like: "secretKey": "actual_value" or secretKey=actual_value + patterns = [ + # JSON format: "field": "value" + rf'"{field}"\s*:\s*"[^"]*"', + rf'"{field}"\s*:\s*"[^"]*"', + # Key-value format: field=value + rf'{field}\s*=\s*[^\s,\]]+', + # Quoted format: 'field': 'value' + rf"'{field}'\s*:\s*'[^']*'", + ] + + for pattern in patterns: + # Replace the value part while keeping the field name + if '"' in pattern: + masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text) + elif "'" in pattern: + masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text) + else: + masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text) + + return masked_text + class DataxClient: def __init__(self, task: CollectionTask, execution: TaskExecution, template: CollectionTemplate): self.execution = execution @@ -229,10 +280,12 @@ def write_header_log(self, cmd: str, log_f): @staticmethod def read_stream(stream, log_f): - """读取输出流""" + """读取输出流并屏蔽敏感信息""" for line in stream: line = line.rstrip('\n') if line: + # Mask sensitive information before writing to log + masked_line = mask_sensitive_info(line) # 写入日志文件 - log_f.write(f"{line}\n") + log_f.write(f"{masked_line}\n") log_f.flush() From 7e26aaa2fad03dc3a74582f0b4d6668e3e390d7d Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 20 May 2026 10:27:23 +0800 Subject: [PATCH 2/3] :lock: mask sensitive data --- .gitignore | 9 +- runtime/datamate-python/app/core/security.py | 156 ++++++++++++++++++ .../module/collection/client/datax_client.py | 52 +----- .../module/collection/interface/collection.py | 24 ++- .../module/collection/schema/collection.py | 15 +- 5 files changed, 197 insertions(+), 59 deletions(-) create mode 100644 runtime/datamate-python/app/core/security.py diff --git a/.gitignore b/.gitignore index 9b34b7af..22f9e044 100644 --- a/.gitignore +++ b/.gitignore @@ -191,4 +191,11 @@ Thumbs.db # Milvus **/volumes/ -**/rag_storage/ \ No newline at end of file +**/rag_storage/ + +# AI tools/agent +.agents/ +.claude/ +data/ +openspec/ +devspace.yaml diff --git a/runtime/datamate-python/app/core/security.py b/runtime/datamate-python/app/core/security.py new file mode 100644 index 00000000..548bd93d --- /dev/null +++ b/runtime/datamate-python/app/core/security.py @@ -0,0 +1,156 @@ +"""Security utilities for handling sensitive data""" +import re +from typing import Any, Dict + + +SENSITIVE_FIELDS = [ + "secretKey", + "accessKey", + "password", + "passwd", + "pwd", + "secret", + "token", + "apiKey", + "api_key", + "access_key", + "secret_key", +] + +MASK_PATTERN = "**********" + + +def mask_sensitive_value(value: str) -> str: + """Mask a single sensitive value""" + return MASK_PATTERN + + +def is_masked_value(value: str) -> bool: + """Check if a value is already masked""" + return value == MASK_PATTERN + + +def preserve_sensitive_values( + new_config: Dict[str, Any], + original_config: Dict[str, Any] +) -> Dict[str, Any]: + """ + Preserve original sensitive values if masked pattern is detected in update request. + + When frontend receives masked values and sends them back in update request, + this function detects masked values and replaces them with original values + from database. + + Args: + new_config: Config from update request (may contain masked values) + original_config: Original config from database (contains real values) + + Returns: + Config with original sensitive values preserved + """ + if not isinstance(new_config, dict) or not isinstance(original_config, dict): + return new_config + + preserved_config = {} + for key, new_value in new_config.items(): + # Check if key is a sensitive field + key_lower = key.lower() + is_sensitive = any( + field.lower() == key_lower or field.lower() in key_lower + for field in SENSITIVE_FIELDS + ) + + # If value is masked and field is sensitive, use original value + if is_sensitive and isinstance(new_value, str) and is_masked_value(new_value): + original_value = original_config.get(key) + preserved_config[key] = original_value if original_value else new_value + elif isinstance(new_value, dict): + # Recursively process nested dictionaries + original_nested = original_config.get(key, {}) + preserved_config[key] = preserve_sensitive_values(new_value, original_nested) + elif isinstance(new_value, list): + # Process list items (preserve original if matching structure) + original_list = original_config.get(key, []) + preserved_config[key] = [ + preserve_sensitive_values(new_item, orig_item) + if isinstance(new_item, dict) and isinstance(orig_item, dict) + else new_item + for new_item, orig_item in zip(new_value, original_list) + ] if len(new_value) == len(original_list) else new_value + else: + # Keep non-sensitive/non-masked values + preserved_config[key] = new_value + + return preserved_config + + +def mask_sensitive_dict(data: Dict[str, Any]) -> Dict[str, Any]: + """ + Recursively mask sensitive fields in a dictionary. + + Args: + data: Dictionary that may contain sensitive fields + + Returns: + Dictionary with sensitive values masked + """ + if not isinstance(data, dict): + return data + + masked_data = {} + for key, value in data.items(): + # Check if the key is a sensitive field (case-insensitive) + key_lower = key.lower() + is_sensitive = any( + field.lower() == key_lower or field.lower() in key_lower + for field in SENSITIVE_FIELDS + ) + + if is_sensitive and isinstance(value, str): + # Mask the sensitive value + masked_data[key] = MASK_PATTERN + elif isinstance(value, dict): + # Recursively mask nested dictionaries + masked_data[key] = mask_sensitive_dict(value) + elif isinstance(value, list): + # Process list items + masked_data[key] = [ + mask_sensitive_dict(item) if isinstance(item, dict) else item + for item in value + ] + else: + # Keep non-sensitive values as-is + masked_data[key] = value + + return masked_data + + +def mask_sensitive_info(text: str) -> str: + """ + Mask sensitive information in text by replacing values with ********** + + Args: + text: Original text that may contain sensitive information + + Returns: + Text with sensitive values masked + """ + masked_text = text + + for field in SENSITIVE_FIELDS: + patterns = [ + rf'"{field}"\s*:\s*"[^"]*"', + rf'"{field}"\s*:\s*"[^"]*"', + rf'{field}\s*=\s*[^\s,\]]+', + rf"'{field}'\s*:\s*'[^']*'", + ] + + for pattern in patterns: + if '"' in pattern: + masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text) + elif "'" in pattern: + masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text) + else: + masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text) + + return masked_text \ No newline at end of file diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py index ed86431c..42c09a86 100644 --- a/runtime/datamate-python/app/module/collection/client/datax_client.py +++ b/runtime/datamate-python/app/module/collection/client/datax_client.py @@ -1,5 +1,4 @@ import json -import re import threading import subprocess from datetime import datetime @@ -7,62 +6,13 @@ from typing import Dict, Any from app.core.logging import get_logger +from app.core.security import mask_sensitive_info from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.module.collection.schema.collection import CollectionConfig, SyncMode from app.module.shared.schema import TaskStatus logger = get_logger(__name__) -# Sensitive fields that need to be masked in logs -SENSITIVE_FIELDS = [ - "secretKey", - "accessKey", - "password", - "passwd", - "pwd", - "secret", - "token", - "apiKey", - "api_key", -] - -MASK_PATTERN = "**********" - -def mask_sensitive_info(text: str) -> str: - """ - Mask sensitive information in text by replacing values with ********** - - Args: - text: Original text that may contain sensitive information - - Returns: - Text with sensitive values masked - """ - masked_text = text - - for field in SENSITIVE_FIELDS: - # Match patterns like: "secretKey": "actual_value" or secretKey=actual_value - patterns = [ - # JSON format: "field": "value" - rf'"{field}"\s*:\s*"[^"]*"', - rf'"{field}"\s*:\s*"[^"]*"', - # Key-value format: field=value - rf'{field}\s*=\s*[^\s,\]]+', - # Quoted format: 'field': 'value' - rf"'{field}'\s*:\s*'[^']*'", - ] - - for pattern in patterns: - # Replace the value part while keeping the field name - if '"' in pattern: - masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text) - elif "'" in pattern: - masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text) - else: - masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text) - - return masked_text - class DataxClient: def __init__(self, task: CollectionTask, execution: TaskExecution, template: CollectionTemplate): self.execution = execution diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py index eebeafc7..998a19cc 100644 --- a/runtime/datamate-python/app/module/collection/interface/collection.py +++ b/runtime/datamate-python/app/module/collection/interface/collection.py @@ -12,11 +12,12 @@ from app.core.exception import ErrorCodes, BusinessError, SuccessResponse, transaction from app.core.logging import get_logger +from app.core.security import preserve_sensitive_values from app.db.models import Dataset, DatasetFiles from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.db.session import get_db from app.module.collection.client.datax_client import DataxClient -from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, converter_to_response, \ +from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, CollectionConfig, converter_to_response, \ convert_for_create, SyncMode from app.module.collection.schedule import schedule_collection_task, remove_collection_task from app.module.collection.service.collection import CollectionTaskService @@ -300,12 +301,27 @@ async def update_task( reschedule_collection_task(task_id, task.schedule_expression) if 'config' in update_data: - # 重新生成任务配置文件 + # Get original config from database + original_config = json.loads(task.config) if task.config else {} + + # Preserve sensitive values if masked pattern detected + preserved_config = preserve_sensitive_values( + request.config.dict(), + original_config + ) + + # Regenerate task config file with preserved sensitive values template = await db.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id)) template = template.scalar_one_or_none() if template: - DataxClient.generate_datx_config(request.config, template, task.target_path) - task.config = json.dumps(request.config.dict()) + # Use preserved config to generate DataX config + DataxClient.generate_datx_config( + CollectionConfig(**preserved_config), + template, + task.target_path + ) + # Save preserved config to database + task.config = json.dumps(preserved_config) # 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行 if task.status == TaskStatus.FAILED.name: diff --git a/runtime/datamate-python/app/module/collection/schema/collection.py b/runtime/datamate-python/app/module/collection/schema/collection.py index c8608cba..5ae62128 100644 --- a/runtime/datamate-python/app/module/collection/schema/collection.py +++ b/runtime/datamate-python/app/module/collection/schema/collection.py @@ -2,11 +2,12 @@ import uuid from datetime import datetime from enum import Enum -from typing import Optional +from typing import Optional, Dict, Any -from pydantic import BaseModel, Field, validator, ConfigDict, field_validator +from pydantic import BaseModel, Field, validator, ConfigDict, field_validator, field_serializer from pydantic.alias_generators import to_camel +from app.core.security import mask_sensitive_dict from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate from app.module.dataset.schema import DatasetTypeResponse from app.module.dataset.schema.dataset import DatasetType @@ -23,6 +24,13 @@ class CollectionConfig(BaseModel): writer: Optional[dict] = Field(None, description="writer参数") job: Optional[dict] = Field(None, description="任务配置") + @field_serializer('parameter', 'reader', 'writer', 'job', when_used='json') + def mask_sensitive_fields(self, value: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """Mask sensitive fields when serializing to JSON""" + if value is None: + return value + return mask_sensitive_dict(value) + class CollectionTaskBase(BaseModel): id: str = Field(..., description="任务id") name: str = Field(..., description="任务名称") @@ -93,6 +101,7 @@ def validate_timeout(cls, v): ) def converter_to_response(task: CollectionTask) -> CollectionTaskBase: + config_dict = json.loads(task.config) return CollectionTaskBase( id=task.id, name=task.name, @@ -101,7 +110,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase: template_id=task.template_id, template_name=task.template_name, target_path=task.target_path, - config=json.loads(task.config), + config=CollectionConfig(**config_dict), schedule_expression=task.schedule_expression, status=task.status, retry_count=task.retry_count, From 6bb6bdc3b4ead37a55cdc2606855ab006a3a47aa Mon Sep 17 00:00:00 2001 From: MoeexT Date: Wed, 20 May 2026 16:31:28 +0800 Subject: [PATCH 3/3] :bug: could cause task updates to fail --- runtime/datamate-python/app/core/security.py | 25 +++++++++++-------- .../module/collection/client/datax_client.py | 22 ++++++---------- .../module/collection/interface/collection.py | 7 +++--- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/runtime/datamate-python/app/core/security.py b/runtime/datamate-python/app/core/security.py index 548bd93d..3b8c0618 100644 --- a/runtime/datamate-python/app/core/security.py +++ b/runtime/datamate-python/app/core/security.py @@ -69,14 +69,18 @@ def preserve_sensitive_values( original_nested = original_config.get(key, {}) preserved_config[key] = preserve_sensitive_values(new_value, original_nested) elif isinstance(new_value, list): - # Process list items (preserve original if matching structure) original_list = original_config.get(key, []) - preserved_config[key] = [ - preserve_sensitive_values(new_item, orig_item) - if isinstance(new_item, dict) and isinstance(orig_item, dict) - else new_item - for new_item, orig_item in zip(new_value, original_list) - ] if len(new_value) == len(original_list) else new_value + preserved_list = [] + for i, new_item in enumerate(new_value): + if i < len(original_list): + orig_item = original_list[i] + if isinstance(new_item, dict) and isinstance(orig_item, dict): + preserved_list.append(preserve_sensitive_values(new_item, orig_item)) + else: + preserved_list.append(new_item) + else: + preserved_list.append(new_item) + preserved_config[key] = preserved_list else: # Keep non-sensitive/non-masked values preserved_config[key] = new_value @@ -139,7 +143,6 @@ def mask_sensitive_info(text: str) -> str: for field in SENSITIVE_FIELDS: patterns = [ - rf'"{field}"\s*:\s*"[^"]*"', rf'"{field}"\s*:\s*"[^"]*"', rf'{field}\s*=\s*[^\s,\]]+', rf"'{field}'\s*:\s*'[^']*'", @@ -147,10 +150,10 @@ def mask_sensitive_info(text: str) -> str: for pattern in patterns: if '"' in pattern: - masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text) + masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text, flags=re.IGNORECASE) elif "'" in pattern: - masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text) + masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text, flags=re.IGNORECASE) else: - masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text) + masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text, flags=re.IGNORECASE) return masked_text \ No newline at end of file diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py index 42c09a86..85a6dc28 100644 --- a/runtime/datamate-python/app/module/collection/client/datax_client.py +++ b/runtime/datamate-python/app/module/collection/client/datax_client.py @@ -119,25 +119,19 @@ def run_datax_job(self): Returns: 执行结果字典 """ - # 创建配置文件 - self.create__config_file() + if not self.execution.started_at: + self.execution.started_at = datetime.now() + try: - # 构建命令 + self.create__config_file() cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)] cmd_str = ' '.join(cmd) logger.info(f"执行命令: {cmd_str}") - if not self.execution.started_at: - self.execution.started_at = datetime.now() - # 执行命令并写入日志 with open(self.execution.log_path, 'w', encoding='utf-8') as log_f: - # 写入头信息 self.write_header_log(cmd_str, log_f) - # 启动datax进程 exit_code = self._run_process(cmd, log_f) - # 记录结束时间 self.execution.completed_at = datetime.now() self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() - # 写入结束信息 self.write_tail_log(exit_code, log_f) if exit_code == 0: logger.info(f"DataX 任务执行成功: {self.execution.id}") @@ -150,17 +144,17 @@ def run_datax_job(self): logger.error(self.execution.error_message) except Exception as e: self.execution.completed_at = datetime.now() - self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() + if self.execution.started_at: + self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds() self.execution.error_message = f"执行异常: {e}" self.execution.status = TaskStatus.FAILED.name logger.error(f"执行异常: {e}", exc_info=True) + with open(self.execution.log_path, 'w', encoding='utf-8') as log_f: + log_f.write(f"任务执行失败: {e}\n") - # 根据同步模式更新任务状态 if self.task.sync_mode == SyncMode.ONCE: - # 一次性任务:使用执行结果作为最终状态 self.task.status = self.execution.status else: - # 定时任务:恢复为 PENDING 状态,等待下次执行 self.task.status = TaskStatus.PENDING.name def rename_collection_result(self): diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py index 998a19cc..0d1cbef9 100644 --- a/runtime/datamate-python/app/module/collection/interface/collection.py +++ b/runtime/datamate-python/app/module/collection/interface/collection.py @@ -315,13 +315,14 @@ async def update_task( template = template.scalar_one_or_none() if template: # Use preserved config to generate DataX config + config_obj = CollectionConfig(**preserved_config) DataxClient.generate_datx_config( - CollectionConfig(**preserved_config), + config_obj, template, task.target_path ) - # Save preserved config to database - task.config = json.dumps(preserved_config) + # Save the modified config (with regenerated job) to database + task.config = json.dumps(config_obj.model_dump()) # 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行 if task.status == TaskStatus.FAILED.name: