Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,11 @@ Thumbs.db
# Milvus
**/volumes/

**/rag_storage/
**/rag_storage/

# AI tools/agent
.agents/
.claude/
data/
openspec/
devspace.yaml
159 changes: 159 additions & 0 deletions runtime/datamate-python/app/core/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Security utilities for handling sensitive data"""
import re
from typing import Any, Dict


SENSITIVE_FIELDS = [
"secretKey",
"accessKey",
"password",
"passwd",
"pwd",
"secret",
"token",
"apiKey",
"api_key",
"access_key",
"secret_key",
]

MASK_PATTERN = "**********"


def mask_sensitive_value(value: str) -> str:
"""Mask a single sensitive value"""
return MASK_PATTERN


def is_masked_value(value: str) -> bool:
"""Check if a value is already masked"""
return value == MASK_PATTERN


def preserve_sensitive_values(
new_config: Dict[str, Any],
original_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Preserve original sensitive values if masked pattern is detected in update request.

When frontend receives masked values and sends them back in update request,
this function detects masked values and replaces them with original values
from database.

Args:
new_config: Config from update request (may contain masked values)
original_config: Original config from database (contains real values)

Returns:
Config with original sensitive values preserved
"""
if not isinstance(new_config, dict) or not isinstance(original_config, dict):
return new_config

preserved_config = {}
for key, new_value in new_config.items():
# Check if key is a sensitive field
key_lower = key.lower()
is_sensitive = any(
field.lower() == key_lower or field.lower() in key_lower
for field in SENSITIVE_FIELDS
)

# If value is masked and field is sensitive, use original value
if is_sensitive and isinstance(new_value, str) and is_masked_value(new_value):
original_value = original_config.get(key)
preserved_config[key] = original_value if original_value else new_value
elif isinstance(new_value, dict):
# Recursively process nested dictionaries
original_nested = original_config.get(key, {})
preserved_config[key] = preserve_sensitive_values(new_value, original_nested)
elif isinstance(new_value, list):
original_list = original_config.get(key, [])
preserved_list = []
for i, new_item in enumerate(new_value):
if i < len(original_list):
orig_item = original_list[i]
if isinstance(new_item, dict) and isinstance(orig_item, dict):
preserved_list.append(preserve_sensitive_values(new_item, orig_item))
else:
preserved_list.append(new_item)
else:
preserved_list.append(new_item)
preserved_config[key] = preserved_list
else:
# Keep non-sensitive/non-masked values
preserved_config[key] = new_value

return preserved_config


def mask_sensitive_dict(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Recursively mask sensitive fields in a dictionary.

Args:
data: Dictionary that may contain sensitive fields

Returns:
Dictionary with sensitive values masked
"""
if not isinstance(data, dict):
return data

masked_data = {}
for key, value in data.items():
# Check if the key is a sensitive field (case-insensitive)
key_lower = key.lower()
is_sensitive = any(
field.lower() == key_lower or field.lower() in key_lower
for field in SENSITIVE_FIELDS
)

if is_sensitive and isinstance(value, str):
# Mask the sensitive value
masked_data[key] = MASK_PATTERN
elif isinstance(value, dict):
# Recursively mask nested dictionaries
masked_data[key] = mask_sensitive_dict(value)
elif isinstance(value, list):
# Process list items
masked_data[key] = [
mask_sensitive_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
# Keep non-sensitive values as-is
masked_data[key] = value

return masked_data


def mask_sensitive_info(text: str) -> str:
"""
Mask sensitive information in text by replacing values with **********

Args:
text: Original text that may contain sensitive information

Returns:
Text with sensitive values masked
"""
masked_text = text

for field in SENSITIVE_FIELDS:
patterns = [
rf'"{field}"\s*:\s*"[^"]*"',
rf'{field}\s*=\s*[^\s,\]]+',
rf"'{field}'\s*:\s*'[^']*'",
]

for pattern in patterns:
if '"' in pattern:
masked_text = re.sub(pattern, f'"{field}": "{MASK_PATTERN}"', masked_text, flags=re.IGNORECASE)
elif "'" in pattern:
masked_text = re.sub(pattern, f"'{field}': '{MASK_PATTERN}'", masked_text, flags=re.IGNORECASE)
else:
masked_text = re.sub(pattern, f'{field}={MASK_PATTERN}', masked_text, flags=re.IGNORECASE)

return masked_text
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Dict, Any

from app.core.logging import get_logger
from app.core.security import mask_sensitive_info
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
from app.module.collection.schema.collection import CollectionConfig, SyncMode
from app.module.shared.schema import TaskStatus
Expand Down Expand Up @@ -118,25 +119,19 @@ def run_datax_job(self):
Returns:
执行结果字典
"""
# 创建配置文件
self.create__config_file()
if not self.execution.started_at:
self.execution.started_at = datetime.now()

try:
# 构建命令
self.create__config_file()
cmd = [self.python_path, str(self.datax_main), str(self.config_file_path)]
cmd_str = ' '.join(cmd)
logger.info(f"执行命令: {cmd_str}")
if not self.execution.started_at:
self.execution.started_at = datetime.now()
# 执行命令并写入日志
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
# 写入头信息
self.write_header_log(cmd_str, log_f)
# 启动datax进程
exit_code = self._run_process(cmd, log_f)
# 记录结束时间
self.execution.completed_at = datetime.now()
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
# 写入结束信息
self.write_tail_log(exit_code, log_f)
if exit_code == 0:
logger.info(f"DataX 任务执行成功: {self.execution.id}")
Expand All @@ -149,17 +144,17 @@ def run_datax_job(self):
logger.error(self.execution.error_message)
except Exception as e:
self.execution.completed_at = datetime.now()
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
if self.execution.started_at:
self.execution.duration_seconds = (self.execution.completed_at - self.execution.started_at).total_seconds()
self.execution.error_message = f"执行异常: {e}"
self.execution.status = TaskStatus.FAILED.name
logger.error(f"执行异常: {e}", exc_info=True)
with open(self.execution.log_path, 'w', encoding='utf-8') as log_f:
log_f.write(f"任务执行失败: {e}\n")

# 根据同步模式更新任务状态
if self.task.sync_mode == SyncMode.ONCE:
# 一次性任务:使用执行结果作为最终状态
self.task.status = self.execution.status
else:
# 定时任务:恢复为 PENDING 状态,等待下次执行
self.task.status = TaskStatus.PENDING.name

def rename_collection_result(self):
Expand Down Expand Up @@ -229,10 +224,12 @@ def write_header_log(self, cmd: str, log_f):

@staticmethod
def read_stream(stream, log_f):
"""读取输出流"""
"""读取输出流并屏蔽敏感信息"""
for line in stream:
line = line.rstrip('\n')
if line:
# Mask sensitive information before writing to log
masked_line = mask_sensitive_info(line)
# 写入日志文件
log_f.write(f"{line}\n")
log_f.write(f"{masked_line}\n")
log_f.flush()
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@

from app.core.exception import ErrorCodes, BusinessError, SuccessResponse, transaction
from app.core.logging import get_logger
from app.core.security import preserve_sensitive_values
from app.db.models import Dataset, DatasetFiles
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
from app.db.session import get_db
from app.module.collection.client.datax_client import DataxClient
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, converter_to_response, \
from app.module.collection.schema.collection import CollectionTaskBase, CollectionTaskCreate, CollectionTaskUpdate, CollectionConfig, converter_to_response, \
convert_for_create, SyncMode
from app.module.collection.schedule import schedule_collection_task, remove_collection_task
from app.module.collection.service.collection import CollectionTaskService
Expand Down Expand Up @@ -300,12 +301,28 @@ async def update_task(
reschedule_collection_task(task_id, task.schedule_expression)

if 'config' in update_data:
# 重新生成任务配置文件
# Get original config from database
original_config = json.loads(task.config) if task.config else {}

# Preserve sensitive values if masked pattern detected
preserved_config = preserve_sensitive_values(
request.config.dict(),
original_config
)

# Regenerate task config file with preserved sensitive values
template = await db.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id))
template = template.scalar_one_or_none()
if template:
DataxClient.generate_datx_config(request.config, template, task.target_path)
task.config = json.dumps(request.config.dict())
# Use preserved config to generate DataX config
config_obj = CollectionConfig(**preserved_config)
DataxClient.generate_datx_config(
config_obj,
template,
task.target_path
)
# Save the modified config (with regenerated job) to database
task.config = json.dumps(config_obj.model_dump())

# 如果任务处于 FAILED 状态,修改后重置为 PENDING,允许重新执行
if task.status == TaskStatus.FAILED.name:
Expand Down
15 changes: 12 additions & 3 deletions runtime/datamate-python/app/module/collection/schema/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import uuid
from datetime import datetime
from enum import Enum
from typing import Optional
from typing import Optional, Dict, Any

from pydantic import BaseModel, Field, validator, ConfigDict, field_validator
from pydantic import BaseModel, Field, validator, ConfigDict, field_validator, field_serializer
from pydantic.alias_generators import to_camel

from app.core.security import mask_sensitive_dict
from app.db.models.data_collection import CollectionTask, TaskExecution, CollectionTemplate
from app.module.dataset.schema import DatasetTypeResponse
from app.module.dataset.schema.dataset import DatasetType
Expand All @@ -23,6 +24,13 @@ class CollectionConfig(BaseModel):
writer: Optional[dict] = Field(None, description="writer参数")
job: Optional[dict] = Field(None, description="任务配置")

@field_serializer('parameter', 'reader', 'writer', 'job', when_used='json')
def mask_sensitive_fields(self, value: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Mask sensitive fields when serializing to JSON"""
if value is None:
return value
return mask_sensitive_dict(value)

class CollectionTaskBase(BaseModel):
id: str = Field(..., description="任务id")
name: str = Field(..., description="任务名称")
Expand Down Expand Up @@ -93,6 +101,7 @@ def validate_timeout(cls, v):
)

def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
config_dict = json.loads(task.config)
return CollectionTaskBase(
id=task.id,
name=task.name,
Expand All @@ -101,7 +110,7 @@ def converter_to_response(task: CollectionTask) -> CollectionTaskBase:
template_id=task.template_id,
template_name=task.template_name,
target_path=task.target_path,
config=json.loads(task.config),
config=CollectionConfig(**config_dict),
schedule_expression=task.schedule_expression,
status=task.status,
retry_count=task.retry_count,
Expand Down
Loading