diff --git a/.github/workflows/sync-main-into-dev.yml b/.github/workflows/sync-main-into-dev.yml new file mode 100644 index 000000000..41b38004f --- /dev/null +++ b/.github/workflows/sync-main-into-dev.yml @@ -0,0 +1,126 @@ +name: Sync main into dev + +on: + push: + branches: + - main + workflow_dispatch: + inputs: + sync_label: + description: "Optional label to use in the sync branch and PR title" + required: false + type: string + +permissions: + contents: write + pull-requests: write + +concurrency: + group: sync-main-into-dev-${{ github.sha || github.run_id }} + cancel-in-progress: false + +jobs: + sync-main-into-dev: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Configure git + run: | + git config user.name "github-actions[bot]" + git config user.email "github-actions[bot]@users.noreply.github.com" + + - name: Fetch branches + run: git fetch origin main dev + + - name: Resolve sync label + id: sync + env: + INPUT_SYNC_LABEL: ${{ github.event.inputs.sync_label }} + run: | + set -euo pipefail + + main_sha="$(git rev-parse --short=12 origin/main)" + sync_label="${INPUT_SYNC_LABEL:-main-${main_sha}}" + safe_label="$(printf '%s' "$sync_label" | tr -c 'A-Za-z0-9._-' '-' | sed -e 's/^-*//' -e 's/-*$//')" + + if [ -z "$safe_label" ]; then + safe_label="main-${main_sha}" + fi + + echo "label=$sync_label" >> "$GITHUB_OUTPUT" + echo "safe_label=$safe_label" >> "$GITHUB_OUTPUT" + + - name: Check branch difference + id: diff + run: | + set -euo pipefail + + counts="$(git rev-list --left-right --count origin/main...origin/dev)" + read -r main_only dev_only <<< "$counts" + + echo "main_only=$main_only" >> "$GITHUB_OUTPUT" + echo "dev_only=$dev_only" >> "$GITHUB_OUTPUT" + echo "origin/main only: $main_only" + echo "origin/dev only: $dev_only" + + - name: Skip when dev is not behind main + if: steps.diff.outputs.main_only == '0' + run: echo "dev is not behind main; nothing to sync." + + - name: Merge main into generated sync branch + if: steps.diff.outputs.main_only != '0' + id: merge + env: + SAFE_SYNC_LABEL: ${{ steps.sync.outputs.safe_label }} + SYNC_LABEL: ${{ steps.sync.outputs.label }} + run: | + set -euo pipefail + + branch="chore/sync-main-into-dev-${SAFE_SYNC_LABEL}" + + git switch -C "$branch" origin/dev + git merge --no-ff origin/main -m "chore: sync main into dev after ${SYNC_LABEL}" + git push --force-with-lease -u origin "$branch" + + echo "branch=$branch" >> "$GITHUB_OUTPUT" + + - name: Create or reuse pull request + if: steps.diff.outputs.main_only != '0' + env: + GH_TOKEN: ${{ github.token }} + SYNC_BRANCH: ${{ steps.merge.outputs.branch }} + SYNC_LABEL: ${{ steps.sync.outputs.label }} + MAIN_ONLY: ${{ steps.diff.outputs.main_only }} + DEV_ONLY: ${{ steps.diff.outputs.dev_only }} + run: | + set -euo pipefail + + existing_pr="$(gh pr list --base dev --head "$SYNC_BRANCH" --state open --json url --jq '.[0].url // ""')" + if [ -n "$existing_pr" ]; then + echo "Existing sync PR: $existing_pr" + exit 0 + fi + + body_file="$(mktemp)" + cat > "$body_file" <Flocks + +

+ AI-Native SecOps Platform +

+ +

+ Docs + Release +

+ +

+ License + English + Chinese +

![Flocks WebUI](assets/flocks.webp) diff --git a/README_zh.md b/README_zh.md index f84f6866e..0c05f3966 100644 --- a/README_zh.md +++ b/README_zh.md @@ -1,8 +1,19 @@ -# Flocks - -[English](README.md) | **简体中文** - -AI 原生 SecOps 平台 +

Flocks

+ +

+ AI 原生 SecOps 平台 +

+ +

+ 文档 + Release +

+ +

+ License + English + 中文 +

![Flocks Web](assets/flocks.webp) diff --git a/flocks/cli/service_manager.py b/flocks/cli/service_manager.py index edb207108..df534ffe7 100644 --- a/flocks/cli/service_manager.py +++ b/flocks/cli/service_manager.py @@ -34,6 +34,8 @@ MIN_NODE_MAJOR = 22 FOLLOW_POLL_INTERVAL = 0.5 +MAX_SERVICE_LOG_BYTES = 1024 * 1024 * 1024 +LOG_TRIM_CHUNK_BYTES = 1024 * 1024 WEBUI_DIRECT_BACKEND_URLS_ENV = "FLOCKS_WEBUI_DIRECT_BACKEND_URLS" DEFAULT_FLOCKS_CONSOLE_BASE_URL = "https://portalflocks.threatbook.cn" DEFAULT_VITE_ADDITIONAL_SERVER_ALLOWED_HOSTS = "portalflocks.threatbook.cn" @@ -41,6 +43,11 @@ "未检测到 lsof 或 fuser,无法解析端口占用 PID;将退回到 bind 检查。" "可尝试安装:apt/yum install lsof -y" ) +WINDOWS_FRONTEND_BUILD_ASSERTION_MARKERS = ( + "UV_HANDLE_CLOSING", + "src\\win\\async.c", + "src/win/async.c", +) class ServiceError(RuntimeError): @@ -557,10 +564,12 @@ def _windows_process_snapshot(pid: int) -> dict[str, str] | None: check=False, capture_output=True, text=True, + encoding="utf-8", + errors="replace", ) if completed.returncode == 0: with contextlib.suppress(json.JSONDecodeError): - payload = json.loads(completed.stdout.strip() or "{}") + payload = json.loads((completed.stdout or "").strip() or "{}") if isinstance(payload, dict): return { "name": str(payload.get("Name") or ""), @@ -1036,14 +1045,20 @@ def start_frontend(config: ServiceConfig, console) -> None: frontend_env = build_frontend_env(config) if not config.skip_frontend_build: console.print("[flocks] 构建 WebUI...") - completed = subprocess.run( - [npm, "run", "build"], - cwd=webui_dir, - check=False, - env=frontend_env, - ) + run_kwargs: dict[str, object] = {"cwd": webui_dir, "check": False, "env": frontend_env} + if sys.platform == "win32": + run_kwargs.update({"capture_output": True, "text": True, "encoding": "utf-8", "errors": "replace"}) + completed = subprocess.run([npm, "run", "build"], **run_kwargs) if completed.returncode != 0: - raise ServiceError("WebUI 构建失败。") + output = "\n".join( + value for value in (getattr(completed, "stdout", None), getattr(completed, "stderr", None)) if value + ) + if windows_frontend_build_assertion_is_recoverable(webui_dir, output): + console.print("[flocks] WebUI 构建产物已生成,忽略 Windows Node.js 退出断言。") + else: + if output: + console.print(output) + raise ServiceError("WebUI 构建失败。") command = [ npm, @@ -1612,6 +1627,15 @@ def websocket_access_base_url(config: ServiceConfig) -> str: return _http_to_ws_url(backend_access_base_url(config)) +def windows_frontend_build_assertion_is_recoverable(webui_dir: Path, output: str) -> bool: + """Return True when Windows npm crashed after producing a usable build.""" + if sys.platform != "win32": + return False + if not (webui_dir / "dist" / "index.html").exists(): + return False + return any(marker in output for marker in WINDOWS_FRONTEND_BUILD_ASSERTION_MARKERS) + + def build_frontend_env(config: ServiceConfig) -> dict[str, str]: """Build frontend proxy environment variables from backend service settings.""" env = os.environ.copy() @@ -1680,6 +1704,7 @@ def _spawn_process( kwargs["start_new_session"] = True log_path.parent.mkdir(parents=True, exist_ok=True) + _cap_service_log_file(log_path, MAX_SERVICE_LOG_BYTES) handle = log_path.open("a", encoding="utf-8") try: return subprocess.Popen( @@ -1696,6 +1721,40 @@ def _spawn_process( handle.close() +def _cap_service_log_file(log_path: Path, max_bytes: int = MAX_SERVICE_LOG_BYTES) -> bool: + """Keep service logs under *max_bytes* without deleting or renaming them.""" + if max_bytes <= 0: + return False + try: + size = log_path.stat().st_size + except FileNotFoundError: + return False + except OSError: + return False + if size <= max_bytes: + return False + + read_offset = size - max_bytes + write_offset = 0 + try: + with log_path.open("r+b") as handle: + while read_offset < size: + handle.seek(read_offset) + chunk = handle.read(min(LOG_TRIM_CHUNK_BYTES, size - read_offset)) + if not chunk: + break + handle.seek(write_offset) + handle.write(chunk) + read_offset += len(chunk) + write_offset += len(chunk) + handle.truncate(write_offset) + return True + except OSError: + # Logging must not make daemon startup fail. If Windows still has a + # transient lock, leave the file untouched and continue with append. + return False + + def _run_windows_netstat(port: int) -> str: completed = subprocess.run( ["netstat", "-ano", "-p", "tcp"], diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index 62d846d14..b3f4bc1b9 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -448,22 +448,28 @@ async def _cleanup_workflow_storage(workflow_id: str) -> None: await _remove_storage_key_if_exists(f"{_WORKFLOW_CENTER_RUNTIME_PREFIX}{workflow_id}") await _remove_storage_key_if_exists(f"{_WORKFLOW_CENTER_LOCAL_PID_PREFIX}{workflow_id}") await _remove_storage_prefix(f"{_WORKFLOW_CENTER_RELEASE_PREFIX}{workflow_id}/") + await _remove_storage_prefix(f"workflow_execution_index/{workflow_id}/") try: - exec_keys = await Storage.list("workflow_execution/") + exec_keys = await Storage.list_keys("workflow_execution/") for key in exec_keys: try: exec_data = await Storage.read(key) if isinstance(exec_data, dict) and exec_data.get("workflowId") == workflow_id: - await Storage.remove(key) exec_id = key.rsplit("/", 1)[-1] - step_rows = await Storage.list_raw(_workflow_execution_step_prefix(exec_id)) - for step_key, _value in step_rows: - await Storage.remove(step_key) - except Exception: - pass - except Exception: - pass + await Storage.clear(_workflow_execution_step_prefix(exec_id)) + await Storage.remove(key) + except Exception as exc: + log.warning("workflow.delete.execution_cleanup_failed", { + "workflow_id": workflow_id, + "key": key, + "error": str(exc), + }) + except Exception as exc: + log.warning("workflow.delete.execution_scan_failed", { + "workflow_id": workflow_id, + "error": str(exc), + }) service_dir = Config.get_data_path() / "workflow-services" / "workflows" / workflow_id if service_dir.is_dir(): diff --git a/flocks/session/runner.py b/flocks/session/runner.py index 22406ce56..1e6646541 100644 --- a/flocks/session/runner.py +++ b/flocks/session/runner.py @@ -1517,6 +1517,7 @@ async def _build_device_asset_hint(self) -> Optional[str]: "## 安全设备使用\n\n" f"{summary}\n\n" "当用户要操作特定机房、设备或产品时,先调用 `device_context` 获取 `device_id` 等相关信息。" + "如果当前无已接入设备,请提示用户前往「设备接入」页面添加设备。" "使用 `tool_search` 搜索工具名称查看用法;执行设备工具时必须传入目标 `device_id`。" "如果同类设备有多个候选,不要猜测,先询问用户选择。" ) diff --git a/flocks/storage/storage.py b/flocks/storage/storage.py index 5614ef39a..77bcc5563 100644 --- a/flocks/storage/storage.py +++ b/flocks/storage/storage.py @@ -93,6 +93,23 @@ class Storage: _sqlite_synchronous = "NORMAL" _sqlite_write_retry_attempts = 6 _sqlite_write_retry_base_delay_s = 0.05 + _multi_db_migration_marker_key = "storage.migration.multi_db.v1" + _multi_db_migration_batch_size = 500 + _workflow_key_prefixes = ( + "workflow/", + "workflow_execution/", + "workflow_execution_index/", + "workflow_execution_step/", + "workflow_registry/", + "workflow_release/", + "workflow_runtime/", + "workflow_local_pid/", + "workflow_api_service/", + "workflow_integration_config/", + "workflow_kafka_config/", + "workflow_poller_config/", + "workflow_syslog_config/", + ) # Substrings that mark an SQLite file as unrecoverably damaged at open # time. We deliberately keep this list short and English-only because @@ -130,6 +147,32 @@ def get_db_path(cls) -> Path: data_dir = Config.get_data_path() return data_dir / "flocks.db" + @classmethod + def get_workflow_db_path(cls) -> Path: + """Return the SQLite database path for workflow-domain KV data.""" + return cls.get_db_path().with_name("workflow.db") + + @classmethod + def route_db_path_for_key(cls, key: str) -> Path: + """Return the storage DB path that owns *key*.""" + if cls._is_workflow_key(key): + return cls.get_workflow_db_path() + return cls.get_db_path() + + @classmethod + def route_db_path_for_prefix(cls, prefix: Optional[str]) -> Path: + """Return the storage DB path for a prefix-scoped KV operation.""" + if prefix and ( + cls._is_workflow_key(prefix) + or f"{prefix}/" in cls._workflow_key_prefixes + ): + return cls.get_workflow_db_path() + return cls.get_db_path() + + @classmethod + def _is_workflow_key(cls, key: str) -> bool: + return any(key.startswith(prefix) for prefix in cls._workflow_key_prefixes) + @classmethod async def configure_connection( cls, conn: aiosqlite.Connection @@ -426,6 +469,236 @@ def _resolve_key(key: List[str] | str) -> str: if isinstance(key, list): return "/".join(key) return key + + @classmethod + def _like_prefix_pattern(cls, prefix: str) -> str: + """Return a SQLite LIKE pattern that treats prefix chars literally.""" + return ( + prefix.replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_") + + "%" + ) + + @classmethod + def _like_prefix_clause(cls, column: str = "key") -> str: + return f"{column} LIKE ? ESCAPE '\\'" + + @classmethod + def _workflow_prefix_filter(cls) -> tuple[str, tuple[str, ...]]: + clause = " OR ".join( + cls._like_prefix_clause("key") for _ in cls._workflow_key_prefixes + ) + params = tuple( + cls._like_prefix_pattern(prefix) + for prefix in cls._workflow_key_prefixes + ) + return clause, params + + @staticmethod + def _marker_row_count(marker: Dict[str, Any], key: str) -> int: + try: + return int(marker.get(key) or 0) + except (TypeError, ValueError): + return 0 + + @classmethod + async def _ensure_storage_table(cls, db_path: Path) -> None: + """Ensure the generic KV storage table exists in *db_path*.""" + db_path.parent.mkdir(parents=True, exist_ok=True) + + async def _create_storage_table() -> None: + async with cls.connect(db_path) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS storage ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + await db.commit() + + await cls._run_write_with_retry( + _create_storage_table, + action="init.create_storage_table", + target=str(db_path), + ) + + @classmethod + def _read_multi_db_migration_marker_sync(cls) -> Dict[str, Any]: + if cls._db_path is None or not cls._db_path.exists(): + return {} + try: + conn = cls.connect_sync(cls._db_path) + try: + row = conn.execute( + "SELECT value FROM storage WHERE key = ?", + (cls._multi_db_migration_marker_key,), + ).fetchone() + finally: + conn.close() + if row is None: + return {} + value = json.loads(row["value"]) + return value if isinstance(value, dict) else {} + except Exception: + return {} + + @classmethod + def _write_multi_db_migration_marker_sync(cls, marker: Dict[str, Any]) -> None: + if cls._db_path is None: + raise StorageError("Storage DB path is not initialized") + from datetime import UTC + + now = datetime.now(UTC).isoformat() + serialized = json.dumps(marker) + conn = cls.connect_sync(cls._db_path) + try: + conn.execute( + """ + INSERT OR REPLACE INTO storage (key, value, type, created_at, updated_at) + VALUES (?, ?, ?, + COALESCE((SELECT created_at FROM storage WHERE key = ?), ?), + ?) + """, + ( + cls._multi_db_migration_marker_key, + serialized, + "json", + cls._multi_db_migration_marker_key, + now, + now, + ), + ) + conn.commit() + finally: + conn.close() + + @classmethod + def _copy_workflow_kv_to_workflow_db_sync(cls) -> tuple[int, int]: + if cls._db_path is None: + raise StorageError("Storage DB path is not initialized") + workflow_db_path = cls.get_workflow_db_path() + source = cls.connect_sync(cls._db_path) + target = cls.connect_sync(workflow_db_path) + try: + clauses, params = cls._workflow_prefix_filter() + total_migrated = 0 + last_key = "" + while True: + rows = source.execute( + f""" + SELECT key, value, type, created_at, updated_at + FROM storage + WHERE ({clauses}) AND key > ? + ORDER BY key + LIMIT ? + """, + (*params, last_key, cls._multi_db_migration_batch_size), + ).fetchall() + if not rows: + break + + target.executemany( + """ + INSERT OR REPLACE INTO storage (key, value, type, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + """, + [ + ( + row["key"], + row["value"], + row["type"], + row["created_at"], + row["updated_at"], + ) + for row in rows + ], + ) + target.commit() + + keys = [row["key"] for row in rows] + placeholders = ", ".join("?" for _ in keys) + copied = target.execute( + f"SELECT COUNT(*) FROM storage WHERE key IN ({placeholders})", + keys, + ).fetchone()[0] + if copied != len(keys): + raise StorageError( + "Workflow storage migration verification failed: " + f"expected {len(keys)} copied rows, got {copied}" + ) + + total_migrated += len(rows) + last_key = keys[-1] + + return total_migrated, 0 + except Exception: + if source.in_transaction: + source.rollback() + if target.in_transaction: + target.rollback() + raise + finally: + source.close() + target.close() + + @classmethod + async def _migrate_workflow_storage_to_workflow_db( + cls, + *, + workflow_db_existed_before_init: bool, + ) -> None: + """Copy legacy workflow-domain KV rows from flocks.db to workflow.db.""" + marker = await asyncio.to_thread(cls._read_multi_db_migration_marker_sync) + if marker.get("workflow_migrated") is True: + if ( + not workflow_db_existed_before_init + and cls._marker_row_count(marker, "workflow_rows") > 0 + ): + raise StorageError( + "workflow.db is missing after a completed multi-db migration" + ) + return + + from datetime import UTC + + marker.setdefault("version", 1) + marker.setdefault("started_at", datetime.now(UTC).isoformat()) + marker["source_db"] = str(cls.get_db_path()) + marker["workflow_db"] = str(cls.get_workflow_db_path()) + try: + migrated, deleted = await asyncio.to_thread( + cls._copy_workflow_kv_to_workflow_db_sync + ) + marker["workflow_migrated"] = True + marker["workflow_migrated_at"] = datetime.now(UTC).isoformat() + marker["workflow_rows"] = migrated + marker["workflow_source_rows_deleted"] = deleted + marker.pop("workflow_error", None) + await asyncio.to_thread(cls._write_multi_db_migration_marker_sync, marker) + cls._log.info("storage.multi_db.workflow_migrated", { + "rows": migrated, + "source_rows_deleted": deleted, + "source_db": marker["source_db"], + "workflow_db": marker["workflow_db"], + }) + except Exception as exc: + marker["workflow_migrated"] = False + marker["workflow_error"] = str(exc) + marker["workflow_failed_at"] = datetime.now(UTC).isoformat() + try: + await asyncio.to_thread(cls._write_multi_db_migration_marker_sync, marker) + except Exception: + pass + cls._log.error("storage.multi_db.workflow_migration_failed", { + "source_db": marker.get("source_db"), + "workflow_db": marker.get("workflow_db"), + "error": str(exc), + }) + raise @classmethod async def init(cls, db_path: Optional[Path] = None) -> None: @@ -501,6 +774,17 @@ async def init(cls, db_path: Optional[Path] = None) -> None: raise await cls._bootstrap_schema() + # Workflow-domain KV rows live in a sibling workflow.db. Keep the + # public Storage API unchanged by routing workflow key prefixes at + # the KV-operation seam, while initialising the sibling DB here so + # startup fails early if that database is unusable. + workflow_db_path = cls.get_workflow_db_path() + workflow_db_existed_before_init = workflow_db_path.exists() + await cls._ensure_storage_table(workflow_db_path) + await cls._migrate_workflow_storage_to_workflow_db( + workflow_db_existed_before_init=workflow_db_existed_before_init, + ) + # Drain any residual WAL frames left by the previous process so the # next ``SIGKILL`` does not have to truncate a 4 MB-class WAL during # recovery (which is exactly when main-DB page 1 / the header can @@ -666,24 +950,7 @@ async def _bootstrap_schema(cls) -> None: first call surfaces the corruption, and the second call (after the rename) bootstraps a fresh database in its place. """ - async def _create_storage_table() -> None: - async with cls.connect(cls._db_path) as db: - await db.execute(""" - CREATE TABLE IF NOT EXISTS storage ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - type TEXT NOT NULL, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL - ) - """) - await db.commit() - - await cls._run_write_with_retry( - _create_storage_table, - action="init.create_storage_table", - target=str(cls._db_path), - ) + await cls._ensure_storage_table(cls._db_path) # Initialize vector storage tables (for memory system) try: @@ -830,6 +1097,9 @@ async def set(cls, key: str, value: Any, value_type: str = "json") -> None: value_type: Type identifier for the value """ await cls._ensure_init() + db_path = cls.route_db_path_for_key(key) + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) if isinstance(value, BaseModel): serialized = value.model_dump_json() @@ -840,7 +1110,7 @@ async def set(cls, key: str, value: Any, value_type: str = "json") -> None: now = datetime.now(UTC).isoformat() async def _write() -> None: - async with cls.connect(cls._db_path) as db: + async with cls.connect(db_path) as db: await db.execute(""" INSERT OR REPLACE INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, @@ -866,8 +1136,11 @@ async def get(cls, key: str, model: Optional[Type[T]] = None) -> Optional[T | An Stored value or None if not found """ await cls._ensure_init() + db_path = cls.route_db_path_for_key(key) + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) - async with cls.connect(cls._db_path) as db: + async with cls.connect(db_path) as db: async with db.execute( "SELECT value, type FROM storage WHERE key = ?", (key,) ) as cursor: @@ -897,9 +1170,12 @@ async def delete(cls, key: str) -> bool: True if deleted, False if not found """ await cls._ensure_init() + db_path = cls.route_db_path_for_key(key) + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) async def _delete() -> bool: - async with cls.connect(cls._db_path) as db: + async with cls.connect(db_path) as db: cursor = await db.execute("DELETE FROM storage WHERE key = ?", (key,)) await db.commit() return cursor.rowcount > 0 @@ -923,19 +1199,60 @@ async def list_keys(cls, prefix: Optional[str] = None) -> List[str]: List of matching keys """ await cls._ensure_init() + if prefix is None: + db_paths = (cls.get_db_path(), cls.get_workflow_db_path()) + else: + db_paths = (cls.route_db_path_for_prefix(prefix),) + for db_path in db_paths: + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) - async with cls.connect(cls._db_path) as db: - if prefix: - query = "SELECT key FROM storage WHERE key LIKE ?" - params = (f"{prefix}%",) - else: - query = "SELECT key FROM storage" - params = () - - async with db.execute(query, params) as cursor: - rows = await cursor.fetchall() + keys: set[str] = set() + for db_path in db_paths: + async with cls.connect(db_path) as db: + if prefix: + query = f"SELECT key FROM storage WHERE {cls._like_prefix_clause()}" + params = (cls._like_prefix_pattern(prefix),) + else: + query = "SELECT key FROM storage" + params = () + + async with db.execute(query, params) as cursor: + rows = await cursor.fetchall() + keys.update(row[0] for row in rows) - return [row[0] for row in rows] + return sorted(keys) + + @classmethod + async def _list_entry_rows( + cls, + *, + prefix: Optional[str], + ) -> List[Tuple[str, str]]: + if prefix is None: + db_paths = (cls.get_db_path(), cls.get_workflow_db_path()) + else: + db_paths = (cls.route_db_path_for_prefix(prefix),) + for db_path in db_paths: + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) + + rows_by_key: dict[str, str] = {} + for db_path in db_paths: + async with cls.connect(db_path) as db: + if prefix: + query = f"SELECT key, value FROM storage WHERE {cls._like_prefix_clause()}" + params = (cls._like_prefix_pattern(prefix),) + else: + query = "SELECT key, value FROM storage" + params = () + + async with db.execute(query, params) as cursor: + rows = await cursor.fetchall() + for key, value in rows: + rows_by_key[key] = value + + return sorted(rows_by_key.items(), key=lambda item: item[0]) @classmethod async def list_entries( @@ -957,17 +1274,7 @@ async def list_entries( List of ``(key, value)`` tuples """ await cls._ensure_init() - - async with cls.connect(cls._db_path) as db: - if prefix: - query = "SELECT key, value FROM storage WHERE key LIKE ?" - params = (f"{prefix}%",) - else: - query = "SELECT key, value FROM storage" - params = () - - async with db.execute(query, params) as cursor: - rows = await cursor.fetchall() + rows = await cls._list_entry_rows(prefix=prefix) entries: List[Tuple[str, T | Any]] = [] for key, value_str in rows: @@ -989,14 +1296,17 @@ async def list_entries_page( ) -> tuple[List[Tuple[str, T | Any]], int]: """List one page of entries for a prefix, plus total matching rows.""" await cls._ensure_init() + db_path = cls.route_db_path_for_prefix(prefix) + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) safe_offset = max(int(offset), 0) safe_limit = max(int(limit), 0) - params = (f"{prefix}%",) + params = (cls._like_prefix_pattern(prefix),) - async with cls.connect(cls._db_path) as db: + async with cls.connect(db_path) as db: async with db.execute( - "SELECT COUNT(*) FROM storage WHERE key LIKE ?", + f"SELECT COUNT(*) FROM storage WHERE {cls._like_prefix_clause()}", params, ) as cursor: row = await cursor.fetchone() @@ -1006,13 +1316,13 @@ async def list_entries_page( return [], total async with db.execute( - """ + f""" SELECT key, value FROM storage - WHERE key LIKE ? + WHERE {cls._like_prefix_clause()} ORDER BY key LIMIT ? OFFSET ? """, - (f"{prefix}%", safe_limit, safe_offset), + (cls._like_prefix_pattern(prefix), safe_limit, safe_offset), ) as cursor: rows = await cursor.fetchall() @@ -1039,19 +1349,7 @@ async def list_raw( Compatible with all SQLite versions (no ``json_extract`` required). """ await cls._ensure_init() - - if prefix: - query = "SELECT key, value FROM storage WHERE key LIKE ?" - params: tuple = (f"{prefix}%",) - else: - query = "SELECT key, value FROM storage" - params = () - - async with cls.connect(cls._db_path) as db: - async with db.execute(query, params) as cursor: - rows = await cursor.fetchall() - - return [(row[0], row[1]) for row in rows] + return await cls._list_entry_rows(prefix=prefix) @classmethod async def exists(cls, key: str) -> bool: @@ -1065,8 +1363,11 @@ async def exists(cls, key: str) -> bool: True if exists, False otherwise """ await cls._ensure_init() + db_path = cls.route_db_path_for_key(key) + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) - async with cls.connect(cls._db_path) as db: + async with cls.connect(db_path) as db: async with db.execute( "SELECT 1 FROM storage WHERE key = ?", (key,) ) as cursor: @@ -1086,25 +1387,39 @@ async def clear(cls, prefix: Optional[str] = None) -> int: Number of deleted entries """ await cls._ensure_init() + if prefix is None: + db_paths = (cls.get_db_path(), cls.get_workflow_db_path()) + else: + db_paths = (cls.route_db_path_for_prefix(prefix),) + for db_path in db_paths: + if db_path != cls.get_db_path(): + await cls._ensure_storage_table(db_path) - async def _clear() -> int: - async with cls.connect(cls._db_path) as db: + async def _clear_db(db_path: Path) -> int: + async with cls.connect(db_path) as db: if prefix: - query = "DELETE FROM storage WHERE key LIKE ?" - params = (f"{prefix}%",) + query = f"DELETE FROM storage WHERE {cls._like_prefix_clause()}" + params = (cls._like_prefix_pattern(prefix),) + cursor = await db.execute(query, params) else: - query = "DELETE FROM storage" - params = () + query = "DELETE FROM storage WHERE key != ?" + params = (cls._multi_db_migration_marker_key,) + cursor = await db.execute(query, params) + await db.execute( + "DELETE FROM storage WHERE key = ?", + (cls._multi_db_migration_marker_key,), + ) - cursor = await db.execute(query, params) await db.commit() return cursor.rowcount - deleted = await cls._run_write_with_retry( - _clear, - action="clear", - target=prefix or "", - ) + deleted = 0 + for db_path in db_paths: + deleted += await cls._run_write_with_retry( + lambda db_path=db_path: _clear_db(db_path), + action="clear", + target=f"{prefix or ''}@{db_path}", + ) cls._log.info("storage.clear", {"prefix": prefix, "deleted": deleted}) cls._invalidate_runtime_caches() diff --git a/flocks/task/manager.py b/flocks/task/manager.py index 4245ddbfb..de038a25a 100644 --- a/flocks/task/manager.py +++ b/flocks/task/manager.py @@ -820,29 +820,44 @@ def _clear_migration_state(cls) -> None: def _with_db_connection() -> sqlite3.Connection: return Storage.connect_sync() + @staticmethod + def _legacy_table_db_paths() -> list[Path]: + paths = [Storage.get_db_path(), TaskStore.get_db_path()] + unique: list[Path] = [] + for path in paths: + if path not in unique: + unique.append(path) + return unique + @classmethod def _legacy_tables_exist(cls) -> bool: - with cls._with_db_connection() as conn: - for table_name in ("tasks", "task_execution_records", "task_queue_refs"): - row = conn.execute( - "SELECT 1 FROM sqlite_master WHERE type='table' AND name = ?", - (table_name,), - ).fetchone() - if row is not None: - return True + for db_path in cls._legacy_table_db_paths(): + if not db_path.exists(): + continue + with Storage.connect_sync(db_path) as conn: + for table_name in ("tasks", "task_execution_records", "task_queue_refs"): + row = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name = ?", + (table_name,), + ).fetchone() + if row is not None: + return True return False @classmethod def _drop_legacy_tables(cls) -> None: - with cls._with_db_connection() as conn: - conn.executescript( - """ - DROP TABLE IF EXISTS task_queue_refs; - DROP TABLE IF EXISTS task_execution_records; - DROP TABLE IF EXISTS tasks; - """ - ) - conn.commit() + for db_path in cls._legacy_table_db_paths(): + if not db_path.exists(): + continue + with Storage.connect_sync(db_path) as conn: + conn.executescript( + """ + DROP TABLE IF EXISTS task_queue_refs; + DROP TABLE IF EXISTS task_execution_records; + DROP TABLE IF EXISTS tasks; + """ + ) + conn.commit() @classmethod async def _enqueue_execution(cls, execution: TaskExecution) -> TaskExecution: diff --git a/flocks/task/store.py b/flocks/task/store.py index 56d505ae2..cbc5aa1b8 100644 --- a/flocks/task/store.py +++ b/flocks/task/store.py @@ -1,8 +1,11 @@ """Task Store — SQLite persistence for scheduler/execution domain.""" +import asyncio import json import os +import sqlite3 from datetime import datetime, timedelta, timezone +from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import aiosqlite @@ -33,6 +36,11 @@ class TaskStore: # connection across ``fork()`` corrupts the DB. _init_pid: Optional[int] = None + @classmethod + def get_db_path(cls) -> Path: + """Return the SQLite database path for task-center tables.""" + return Storage.get_db_path().with_name("tasks.db") + @classmethod async def init(cls) -> None: current_pid = os.getpid() @@ -50,19 +58,34 @@ async def init(cls) -> None: cls._initialized = False cls._init_pid = None await Storage._ensure_init() - cls._conn = await aiosqlite.connect( - Storage._db_path, - timeout=Storage._sqlite_timeout_s, - ) - await Storage.configure_connection(cls._conn) - await cls._conn.executescript(_TASKS_DDL) - for stmt in _INDEX_STMTS: - await cls._conn.execute(stmt) - await cls._conn.commit() - await cls._normalize_legacy_paused_executions() - cls._initialized = True - cls._init_pid = current_pid - log.info("task.store.initialized") + db_path = cls.get_db_path() + db_existed_before_init = db_path.exists() + db_path.parent.mkdir(parents=True, exist_ok=True) + try: + cls._conn = await aiosqlite.connect( + db_path, + timeout=Storage._sqlite_timeout_s, + ) + cls._conn.text_factory = cls._decode_legacy_text + await Storage.configure_connection(cls._conn) + await cls._conn.executescript(_TASKS_DDL) + for stmt in _INDEX_STMTS: + await cls._conn.execute(stmt) + await cls._conn.commit() + await cls._migrate_task_tables_to_tasks_db( + tasks_db_existed_before_init=db_existed_before_init, + ) + cls._initialized = True + cls._init_pid = current_pid + await cls._normalize_legacy_paused_executions() + log.info("task.store.initialized") + except Exception: + if cls._conn: + await cls._conn.close() + cls._conn = None + cls._initialized = False + cls._init_pid = None + raise @classmethod async def close(cls) -> None: @@ -82,7 +105,7 @@ async def _db(cls) -> aiosqlite.Connection: and cls._init_pid != os.getpid() ): await cls.init() - if not cls._conn: + if not cls._conn or not cls._initialized: await cls.init() return cls._conn # type: ignore[return-value] @@ -90,6 +113,190 @@ async def _db(cls) -> aiosqlite.Connection: async def raw_db(cls) -> aiosqlite.Connection: return await cls._db() + @staticmethod + def _decode_legacy_text(value: bytes) -> str: + try: + return value.decode("utf-8") + except UnicodeDecodeError: + return value.decode("gb18030", errors="replace") + + @staticmethod + def _quote_identifier(identifier: str) -> str: + return '"' + identifier.replace('"', '""') + '"' + + @staticmethod + def _table_exists(conn: sqlite3.Connection, table_name: str) -> bool: + row = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name = ?", + (table_name,), + ).fetchone() + return row is not None + + @staticmethod + def _table_columns(conn: sqlite3.Connection, table_name: str) -> list[str]: + rows = conn.execute(f"PRAGMA table_info({TaskStore._quote_identifier(table_name)})").fetchall() + return [str(row["name"]) for row in rows] + + @classmethod + def _copy_task_table_sync( + cls, + source: sqlite3.Connection, + target: sqlite3.Connection, + table_name: str, + ) -> int: + if not cls._table_exists(source, table_name): + return 0 + source_columns = cls._table_columns(source, table_name) + target_columns = set(cls._table_columns(target, table_name)) + copy_columns = [column for column in source_columns if column in target_columns] + if not copy_columns or "id" not in copy_columns: + return 0 + quoted_columns = ", ".join(cls._quote_identifier(column) for column in copy_columns) + table_sql = cls._quote_identifier(table_name) + placeholders = ", ".join("?" for _ in copy_columns) + insert_sql = ( + f"INSERT OR REPLACE INTO {table_sql} ({quoted_columns}) " + f"VALUES ({placeholders})" + ) + copied = 0 + last_rowid = 0 + while True: + rows = source.execute( + f""" + SELECT rowid AS __rowid, {quoted_columns} + FROM {table_sql} + WHERE rowid > ? + ORDER BY rowid + LIMIT ? + """, + (last_rowid, Storage._multi_db_migration_batch_size), + ).fetchall() + if not rows: + break + + target.executemany( + insert_sql, + [tuple(row[column] for column in copy_columns) for row in rows], + ) + ids = [str(row["id"]) for row in rows] + placeholders = ", ".join("?" for _ in ids) + verified = target.execute( + f"SELECT COUNT(*) FROM {table_sql} WHERE id IN ({placeholders})", + ids, + ).fetchone()[0] + if verified != len(ids): + raise RuntimeError( + f"Task table migration verification failed for {table_name}: " + f"expected {len(ids)} copied rows, got {verified}" + ) + + copied += len(rows) + last_rowid = int(rows[-1]["__rowid"]) + + return copied + + @classmethod + def _copy_task_tables_to_tasks_db_sync(cls) -> tuple[int, int, int]: + source_path = Storage.get_db_path() + target_path = cls.get_db_path() + if not source_path.exists(): + return 0, 0, 0 + source = Storage.connect_sync(source_path) + target = Storage.connect_sync(target_path) + try: + source.text_factory = cls._decode_legacy_text + target.text_factory = cls._decode_legacy_text + source.execute("PRAGMA foreign_keys = OFF") + original_foreign_keys = target.execute("PRAGMA foreign_keys").fetchone()[0] + target.execute("PRAGMA foreign_keys = OFF") + total = 0 + for table_name in ( + "task_schedulers", + "task_executions", + "task_execution_queue_refs", + ): + copied = cls._copy_task_table_sync(source, target, table_name) + total += copied + target.commit() + foreign_key_violations = len(target.execute("PRAGMA foreign_key_check").fetchall()) + target.execute(f"PRAGMA foreign_keys = {original_foreign_keys}") + + return total, foreign_key_violations, 0 + except Exception: + if source.in_transaction: + source.rollback() + if target.in_transaction: + target.rollback() + try: + source.execute("PRAGMA foreign_keys = ON") + target.execute("PRAGMA foreign_keys = ON") + except Exception: + pass + raise + finally: + source.close() + target.close() + + @classmethod + async def _migrate_task_tables_to_tasks_db( + cls, + *, + tasks_db_existed_before_init: bool, + ) -> None: + marker = await asyncio.to_thread(Storage._read_multi_db_migration_marker_sync) + if marker.get("tasks_migrated") is True: + if ( + not tasks_db_existed_before_init + and Storage._marker_row_count(marker, "task_rows") > 0 + ): + raise RuntimeError( + "tasks.db is missing after a completed multi-db migration" + ) + return + + marker.setdefault("version", 1) + marker.setdefault("started_at", datetime.now(timezone.utc).isoformat()) + marker["source_db"] = str(Storage.get_db_path()) + marker["tasks_db"] = str(cls.get_db_path()) + try: + migrated, foreign_key_violations, deleted = await asyncio.to_thread( + cls._copy_task_tables_to_tasks_db_sync + ) + marker["tasks_migrated"] = True + marker["tasks_migrated_at"] = datetime.now(timezone.utc).isoformat() + marker["task_rows"] = migrated + marker["task_foreign_key_violations"] = foreign_key_violations + marker["task_source_rows_deleted"] = deleted + marker.pop("tasks_error", None) + await asyncio.to_thread(Storage._write_multi_db_migration_marker_sync, marker) + log.info("task.store.multi_db_migrated", { + "rows": migrated, + "source_rows_deleted": deleted, + "foreign_key_violations": foreign_key_violations, + "source_db": marker["source_db"], + "tasks_db": marker["tasks_db"], + }) + if foreign_key_violations: + log.warn("task.store.multi_db_migrated_with_legacy_fk_violations", { + "foreign_key_violations": foreign_key_violations, + "source_db": marker["source_db"], + "tasks_db": marker["tasks_db"], + }) + except Exception as exc: + marker["tasks_migrated"] = False + marker["tasks_error"] = str(exc) + marker["tasks_failed_at"] = datetime.now(timezone.utc).isoformat() + try: + await asyncio.to_thread(Storage._write_multi_db_migration_marker_sync, marker) + except Exception: + pass + log.error("task.store.multi_db_migration_failed", { + "source_db": marker.get("source_db"), + "tasks_db": marker.get("tasks_db"), + "error": str(exc), + }) + raise + # ------------------------------------------------------------------ # Scheduler CRUD # ------------------------------------------------------------------ diff --git a/flocks/tool/device/device_context_tool.py b/flocks/tool/device/device_context_tool.py index 418819655..ac9750263 100644 --- a/flocks/tool/device/device_context_tool.py +++ b/flocks/tool/device/device_context_tool.py @@ -36,7 +36,10 @@ async def device_context(ctx: ToolContext) -> ToolResult: if not content: return ToolResult( success=True, - output="当前没有已接入的安全设备。请前往「设备接入」页面添加设备后再试。", + output=( + "当前没有已接入的安全设备。" + "设备不存在时,请提醒用户前往设备接入页面添加设备。" + ), ) return ToolResult(success=True, output=content) except Exception as exc: diff --git a/flocks/tool/device/prompt.py b/flocks/tool/device/prompt.py index 4f652e8a8..62c28c94d 100644 --- a/flocks/tool/device/prompt.py +++ b/flocks/tool/device/prompt.py @@ -71,6 +71,8 @@ async def build_device_context_section() -> Optional[str]: lines: List[str] = [ "", + "设备不存在时,请提醒用户前往设备接入页面添加设备。", + "", ] # --- Section 1: device list (references tool_set_id only) --- diff --git a/flocks/tool/question_handler.py b/flocks/tool/question_handler.py index 21ee7a89c..ff4cedeae 100644 --- a/flocks/tool/question_handler.py +++ b/flocks/tool/question_handler.py @@ -177,6 +177,15 @@ async def api_question_handler( "session": session_id, "elapsed": elapsed }) + try: + from flocks.server.routes.event import publish_event + await publish_event("question.rejected", { + "sessionID": session_id, + "requestID": request_id, + "reason": "timeout", + }) + except Exception as event_error: + log.error("question.timeout.event_failed", {"error": str(event_error)}) raise TimeoutError( f"Question timed out after {timeout} seconds waiting for user response" ) diff --git a/flocks/updater/updater.py b/flocks/updater/updater.py index 51d9dcda9..ce6c3c09f 100644 --- a/flocks/updater/updater.py +++ b/flocks/updater/updater.py @@ -443,7 +443,7 @@ async def _build_frontend_workspace( continue try: - code, _, err = await _run_async( + code, out, err = await _run_async( [candidate.npm, "run", "build"], cwd=webui_dir, timeout=_FRONTEND_BUILD_TIMEOUT_SECONDS, @@ -464,6 +464,12 @@ async def _build_frontend_workspace( continue if code != 0: + from flocks.cli import service_manager + + build_output = "\n".join(value for value in (out, err) if value) + if service_manager.windows_frontend_build_assertion_is_recoverable(webui_dir, build_output): + final_frontend_error = None + break final_frontend_error = f"Frontend build failed: {err}" if is_last_attempt: break diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index 98ada8c44..f175f97ac 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -3,10 +3,9 @@ from __future__ import annotations import asyncio -import re import time import uuid -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple from flocks.session.recorder import Recorder from flocks.storage.storage import Storage @@ -203,16 +202,10 @@ def derive_loop_progress( # Keep this intentionally small so high-frequency workflows do not keep # inflating the SQLite row set and matching JSONL audit files indefinitely. _MAX_EXECUTION_HISTORY_PER_WORKFLOW = 30 -# Trim is an O(N) scan over all workflow_execution rows; only run it every Nth -# call per workflow to amortise the cost under high syslog throughput. -_TRIM_CHECK_INTERVAL = 5 -_trim_counters: Dict[str, int] = {} -# Workflows with an in-flight trim task. Because trims run as fire-and-forget -# ``asyncio.create_task`` background jobs, a slow trim under high syslog load -# could otherwise spawn many overlapping scans that each materialise table -# state simultaneously — the exact pattern that drove RSS to 20 GB. This -# guard ensures at most one trim per workflow is ever running. -_trim_in_flight: Set[str] = set() +# Per-workflow trim lock. Trims are awaited by the writer so the retention cap +# is enforced before ``record_execution_result`` returns, while concurrent runs +# for the same workflow serialize instead of skipping cleanup. +_trim_locks: Dict[str, asyncio.Lock] = {} # Per-workflow lock to serialize read-modify-write of stats. Concurrent # executions of the same workflow (e.g. syslog-triggered runs with @@ -233,6 +226,14 @@ def _workflow_stats_key(workflow_id: str) -> str: return f"workflow/{workflow_id}/stats" +def _get_trim_lock(workflow_id: str) -> asyncio.Lock: + lock = _trim_locks.get(workflow_id) + if lock is None: + lock = asyncio.Lock() + _trim_locks[workflow_id] = lock + return lock + + _DEFAULT_STATS: Dict[str, Any] = { "callCount": 0, "successCount": 0, @@ -280,6 +281,20 @@ def workflow_execution_key(exec_id: str) -> str: return f"workflow_execution/{exec_id}" +def workflow_execution_index_prefix(workflow_id: str) -> str: + """Return the storage prefix for one workflow's execution index.""" + return f"workflow_execution_index/{workflow_id}/" + + +def workflow_execution_index_key( + workflow_id: str, + started_at: int, + exec_id: str, +) -> str: + """Return the index key used to trim one workflow without full-table scans.""" + return f"{workflow_execution_index_prefix(workflow_id)}{started_at:020d}/{exec_id}" + + def workflow_execution_step_key(exec_id: str, step_index: int) -> str: """Return the storage key for one workflow execution step.""" return f"workflow_execution_step/{exec_id}/{step_index:08d}" @@ -495,7 +510,14 @@ async def create_execution_record( input_params=compacted_params, exec_id=exec_id, ) - await Storage.write(workflow_execution_key(exec_data["id"]), compact_execution_summary(exec_data)) + exec_key = workflow_execution_key(exec_data["id"]) + await Storage.write(exec_key, compact_execution_summary(exec_data)) + await _write_execution_index( + workflow_id=workflow_id, + exec_id=str(exec_data["id"]), + execution_key=exec_key, + started_at=int(exec_data.get("startedAt") or 0), + ) return exec_data @@ -512,6 +534,12 @@ async def record_execution_result( summary_data["stepCount"] = backfilled_steps await Storage.write(workflow_execution_key(exec_id), compact_execution_summary(summary_data)) + await _write_execution_index( + workflow_id=workflow_id, + exec_id=exec_id, + execution_key=workflow_execution_key(exec_id), + started_at=int(summary_data.get("startedAt") or 0), + ) # Update call/success/error counters so all trigger paths (HTTP, syslog, etc.) # are reflected in the UI stats panel. @@ -554,78 +582,113 @@ async def _record_audit() -> None: pass # Prune old execution records when the per-workflow limit is exceeded. - # Throttled by a per-workflow counter to amortise the O(N) storage scan. + # This is awaited so a successful completion does not silently leave the + # workflow above its retention cap. try: - counter = _trim_counters.get(workflow_id, 0) + 1 - _trim_counters[workflow_id] = counter - if counter >= _TRIM_CHECK_INTERVAL: - _trim_counters[workflow_id] = 0 - # Run trim in the background as well; it scans all execution rows - # and we don't want to delay the caller. - try: - asyncio.create_task( - _trim_execution_history(workflow_id), - name=f"trim-{workflow_id}", - ) - except RuntimeError: - await _trim_execution_history(workflow_id) - except Exception: - pass + await _trim_execution_history(workflow_id) + except Exception as exc: + log.error("workflow.history.trim_failed", { + "workflow_id": workflow_id, + "exec_id": exec_id, + "error": str(exc), + }) + + +async def _write_execution_index( + *, + workflow_id: str, + exec_id: str, + execution_key: str, + started_at: int, +) -> str: + index_key = workflow_execution_index_key(workflow_id, started_at, exec_id) + await Storage.write( + index_key, + { + "workflowId": workflow_id, + "execId": exec_id, + "executionKey": execution_key, + "startedAt": started_at, + }, + ) + return index_key + + +async def _list_workflow_execution_entries( + workflow_id: str, +) -> List[Tuple[str, int, Optional[str]]]: + """Return indexed ``(execution_key, started_at, index_key)`` rows for one workflow.""" + indexed_rows = await Storage.list_entries(workflow_execution_index_prefix(workflow_id)) + entries: List[Tuple[str, int, Optional[str]]] = [] + for index_key, value in indexed_rows: + if not isinstance(value, dict): + continue + exec_key = value.get("executionKey") + exec_id = value.get("execId") + if not isinstance(exec_key, str) and isinstance(exec_id, str): + exec_key = workflow_execution_key(exec_id) + if not isinstance(exec_key, str): + continue + started_at = _as_positive_int(value.get("startedAt")) or 0 + entries.append((exec_key, started_at, index_key)) + return entries -# Regex patterns to extract scalar fields from raw JSON strings without -# calling json.loads. workflowId/startedAt are always serialised near the -# start of the record (set in build_initial_execution_record), so we only -# scan a small prefix of each value string — O(prefix) instead of O(value). -_RE_WORKFLOW_ID = re.compile(r'"workflowId"\s*:\s*"([^"]+)"') -_RE_STARTED_AT = re.compile(r'"startedAt"\s*:\s*(\d+)') +async def _delete_execution_history_record( + execution_key: str, + *, + index_key: Optional[str] = None, +) -> None: + exec_id = execution_key.rsplit("/", 1)[-1] + deleted_steps = await Storage.clear(workflow_execution_step_prefix(exec_id)) + removed_execution = await Storage.remove(execution_key) + if index_key: + await Storage.remove(index_key) + record_path = Recorder.paths().workflow_dir / f"{exec_id}.jsonl" + await asyncio.to_thread(record_path.unlink, missing_ok=True) + log.debug("workflow.history.trim_deleted", { + "exec_id": exec_id, + "execution_key": execution_key, + "steps": deleted_steps, + "removed_execution": removed_execution, + }) async def _trim_execution_history(workflow_id: str) -> None: """Delete the oldest execution records once the per-workflow cap is exceeded. - Uses ``Storage.list_raw`` + regex instead of ``list_entries`` + ``json.loads`` - so that scanning the execution-history table never materialises large JSON - blobs as Python objects. The previous approach caused 100% single-core CPU - usage (``json.raw_decode``) and drove RSS to 20 GB under syslog load. + New records carry a per-workflow ``workflow_execution_index`` key, so hot + trims avoid scanning unrelated workflows. This path is intentionally + index-only: if an old execution has no index key, it is outside the hot + retention path and should be handled by a separate migration/GC task. - Also guards against overlapping trim tasks via ``_trim_in_flight``: because - trims run as fire-and-forget background tasks, without the guard a slow trim - would spawn multiple concurrent scans that each load the full table - simultaneously, multiplying the memory spike. + A per-workflow lock serializes concurrent trims. Cleanup is awaited by + ``record_execution_result`` so the retention cap is enforced synchronously + instead of being an opportunistic background task. """ - # Coalesce overlapping trims: only one scan per workflow at a time. - if workflow_id in _trim_in_flight: - return - _trim_in_flight.add(workflow_id) - try: - raw_rows = await Storage.list_raw("workflow_execution/") - wf_entries: List[tuple] = [] - for key, value_str in raw_rows: - # Scan only the first 400 chars — enough for workflowId + startedAt. - head = value_str[:400] - m_wf = _RE_WORKFLOW_ID.search(head) - if not m_wf or m_wf.group(1) != workflow_id: - continue - m_ts = _RE_STARTED_AT.search(head) - started_at = int(m_ts.group(1)) if m_ts else 0 - wf_entries.append((key, started_at)) - + lock = _get_trim_lock(workflow_id) + async with lock: + wf_entries = await _list_workflow_execution_entries(workflow_id) if len(wf_entries) <= _MAX_EXECUTION_HISTORY_PER_WORKFLOW: return + # Sort ascending by startedAt and remove the oldest excess records. wf_entries.sort(key=lambda kd: kd[1]) excess = len(wf_entries) - _MAX_EXECUTION_HISTORY_PER_WORKFLOW - for key, _ in wf_entries[:excess]: + failures: List[str] = [] + for key, _started_at, index_key in wf_entries[:excess]: try: - exec_id = key.rsplit("/", 1)[-1] - await Storage.remove(key) - step_rows = await Storage.list_raw(workflow_execution_step_prefix(exec_id)) - for step_key, _value in step_rows: - await Storage.remove(step_key) - record_path = Recorder.paths().workflow_dir / f"{exec_id}.jsonl" - await asyncio.to_thread(record_path.unlink, missing_ok=True) - except Exception: - pass - finally: - _trim_in_flight.discard(workflow_id) + await _delete_execution_history_record(key, index_key=index_key) + except Exception as exc: + failures.append(f"{key}: {exc}") + log.warning("workflow.history.trim_delete_failed", { + "workflow_id": workflow_id, + "key": key, + "error": str(exc), + }) + + if failures: + raise RuntimeError( + "Failed to trim workflow execution history: " + + "; ".join(failures[:3]) + ) diff --git a/pyproject.toml b/pyproject.toml index 303f222c3..45d82a8ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "flocks" -version = "v2026.6.18" +version = "v2026.6.24" description = "AI-Native SecOps platform with multi-agent collaboration" authors = [ {name = "Flocks Team", email = "team@example.com"} diff --git a/tests/cli/test_service_manager.py b/tests/cli/test_service_manager.py index c8e07d8df..ba42f890d 100644 --- a/tests/cli/test_service_manager.py +++ b/tests/cli/test_service_manager.py @@ -179,6 +179,24 @@ def fail_if_called(*_args, **_kwargs): assert service_manager.pid_is_running(456) is False +def test_windows_process_snapshot_handles_empty_stdout(monkeypatch) -> None: + monkeypatch.setattr(service_manager.sys, "platform", "win32") + monkeypatch.setattr(service_manager, "which", lambda name: "powershell.exe" if name == "powershell" else None) + + def fake_run(*_args, **kwargs): + assert kwargs["encoding"] == "utf-8" + assert kwargs["errors"] == "replace" + return SimpleNamespace(returncode=0, stdout=None) + + monkeypatch.setattr(service_manager.subprocess, "run", fake_run) + + assert service_manager._windows_process_snapshot(123) == { + "name": "", + "command_line": "", + "executable_path": "", + } + + def test_process_group_is_running_ignores_permission_error_without_live_members(monkeypatch) -> None: monkeypatch.setattr(service_manager.sys, "platform", "darwin") monkeypatch.setattr( @@ -1191,6 +1209,53 @@ def fake_spawn(command, **kwargs): assert record.port == 5174 +def test_start_frontend_tolerates_windows_node_assertion_after_build(monkeypatch, tmp_path: Path) -> None: + paths = service_manager.RuntimePaths( + root=tmp_path, + run_dir=tmp_path / "run", + log_dir=tmp_path / "logs", + backend_pid=tmp_path / "run" / "backend.pid", + frontend_pid=tmp_path / "run" / "webui.pid", + backend_log=tmp_path / "logs" / "backend.log", + frontend_log=tmp_path / "logs" / "webui.log", + ) + paths.run_dir.mkdir(parents=True) + paths.log_dir.mkdir(parents=True) + webui_dir = tmp_path / "webui" + webui_dist = webui_dir / "dist" + webui_dist.mkdir(parents=True) + console = DummyConsole() + preview_calls: list[list[str]] = [] + + def fake_run(_command, **_kwargs): + (webui_dist / "index.html").write_text("", encoding="utf-8") + return SimpleNamespace( + returncode=3221226505, + stdout="built in 6.83s", + stderr="Assertion failed: !(handle->flags & UV_HANDLE_CLOSING), file src\\win\\async.c, line 76", + ) + + def fake_spawn(command, **_kwargs): + preview_calls.append(list(command)) + return SimpleNamespace(pid=2468) + + monkeypatch.setattr(service_manager.sys, "platform", "win32") + monkeypatch.setattr(service_manager, "ensure_install_layout", lambda: tmp_path) + monkeypatch.setattr(service_manager, "ensure_runtime_dirs", lambda: paths) + monkeypatch.setattr(service_manager, "cleanup_stale_pid_file", lambda _path: None) + monkeypatch.setattr(service_manager, "port_owner_pids", lambda _port: []) + monkeypatch.setattr(service_manager, "wait_for_http", lambda *_args, **_kwargs: None) + monkeypatch.setattr(service_manager, "resolve_npm_executable", lambda: "npm.cmd") + monkeypatch.setattr(service_manager, "node_version_satisfies_requirement", lambda: True) + monkeypatch.setattr(service_manager.subprocess, "run", fake_run) + monkeypatch.setattr(service_manager, "_spawn_process", fake_spawn) + + service_manager.start_frontend(service_manager.ServiceConfig(), console) + + assert preview_calls[0][:3] == ["npm.cmd", "run", "preview"] + assert "[flocks] WebUI 构建产物已生成,忽略 Windows Node.js 退出断言。" in console.messages + + def test_start_frontend_passes_direct_backend_urls_when_opted_in(monkeypatch, tmp_path: Path) -> None: paths = service_manager.RuntimePaths( root=tmp_path, @@ -1400,6 +1465,37 @@ def fake_popen(*args, **kwargs): assert not (tmp_path / "logs" / "backend.log.1").exists() +def test_cap_service_log_file_keeps_recent_bytes_without_rename(tmp_path: Path) -> None: + log_path = tmp_path / "logs" / "backend.log" + log_path.parent.mkdir(parents=True) + log_path.write_bytes(b"0123456789") + + trimmed = service_manager._cap_service_log_file(log_path, max_bytes=4) + + assert trimmed is True + assert log_path.read_bytes() == b"6789" + assert not (tmp_path / "logs" / "backend.log.1").exists() + + +def test_spawn_process_caps_log_before_appending(monkeypatch, tmp_path: Path) -> None: + log_path = tmp_path / "logs" / "backend.log" + log_path.parent.mkdir(parents=True) + log_path.write_bytes(b"0123456789") + + def fake_popen(*args, **kwargs): + kwargs["stdout"].write("new\n") + kwargs["stdout"].flush() + return SimpleNamespace(pid=9876) + + monkeypatch.setattr(service_manager.sys, "platform", "darwin") + monkeypatch.setattr(service_manager, "MAX_SERVICE_LOG_BYTES", 4) + monkeypatch.setattr(service_manager.subprocess, "Popen", fake_popen) + + service_manager._spawn_process(["python", "-m", "uvicorn"], cwd=tmp_path, log_path=log_path) + + assert log_path.read_text(encoding="utf-8") == "6789new\n" + + def test_spawn_process_passes_custom_environment(monkeypatch, tmp_path: Path) -> None: captured = {} log_path = tmp_path / "logs" / "backend.log" diff --git a/tests/server/routes/test_workflow_trigger_routes.py b/tests/server/routes/test_workflow_trigger_routes.py index f183f6e0d..d811c5034 100644 --- a/tests/server/routes/test_workflow_trigger_routes.py +++ b/tests/server/routes/test_workflow_trigger_routes.py @@ -519,6 +519,8 @@ async def test_delete_workflow_cleans_directory_and_storage( f"workflow_release/{workflow_id}/active", f"workflow_release/{workflow_id}/rel-1", workflow_routes._workflow_execution_key("exec-delete"), + "workflow_execution_step/exec-delete/00000001", + f"workflow_execution_index/{workflow_id}/00000000000000000001/exec-delete", ] for key in storage_keys: payload = {"workflowId": workflow_id} diff --git a/tests/storage/test_multi_db_routing.py b/tests/storage/test_multi_db_routing.py new file mode 100644 index 000000000..1cdfb93b2 --- /dev/null +++ b/tests/storage/test_multi_db_routing.py @@ -0,0 +1,395 @@ +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from flocks.config.config import Config +from flocks.storage.storage import Storage, StorageError +from flocks.task.models import TaskExecution, TaskScheduler +from flocks.task.store import TaskStore, _TASKS_DDL + + +def _reset_state() -> None: + Config._global_config = None + Config._cached_config = None + Storage._db_path = None + Storage._initialized = False + Storage._init_pid = None + TaskStore._initialized = False + TaskStore._conn = None + TaskStore._init_pid = None + + +def _fetch_storage_value(db_path: Path, key: str): + conn = sqlite3.connect(db_path) + try: + row = conn.execute( + "SELECT value FROM storage WHERE key = ?", + (key,), + ).fetchone() + return row[0] if row else None + finally: + conn.close() + + +def _fetch_table_count(db_path: Path, table_name: str) -> int: + conn = sqlite3.connect(db_path) + try: + return conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0] + finally: + conn.close() + + +@pytest.fixture(autouse=True) +async def isolated_multi_db_env(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + data_dir = tmp_path / "flocks_data" + data_dir.mkdir(parents=True, exist_ok=True) + monkeypatch.setenv("FLOCKS_DATA_DIR", str(data_dir)) + _reset_state() + yield + await TaskStore.close() + _reset_state() + + +@pytest.mark.asyncio +async def test_workflow_keys_route_to_workflow_db() -> None: + await Storage.init() + + await Storage.write("workflow/wf-1", {"name": "workflow"}) + await Storage.write("project/proj-1", {"name": "project"}) + + flocks_db = Storage.get_db_path() + workflow_db = Storage.get_workflow_db_path() + + assert _fetch_storage_value(workflow_db, "workflow/wf-1") is not None + assert _fetch_storage_value(flocks_db, "workflow/wf-1") is None + assert _fetch_storage_value(flocks_db, "project/proj-1") is not None + assert _fetch_storage_value(workflow_db, "project/proj-1") is None + assert await Storage.read("workflow/wf-1") == {"name": "workflow"} + assert await Storage.list_keys("workflow") == ["workflow/wf-1"] + + +@pytest.mark.asyncio +async def test_short_non_workflow_prefix_stays_on_flocks_db() -> None: + await Storage.init() + + await Storage.write("workspace/item-1", {"name": "workspace"}) + await Storage.write("workflow/item-1", {"name": "workflow"}) + + assert await Storage.list_keys("work") == ["workspace/item-1"] + + +@pytest.mark.asyncio +async def test_clear_without_prefix_clears_flocks_and_workflow_dbs() -> None: + await Storage.init() + + await Storage.write("project/proj-1", {"name": "project"}) + await Storage.write("workflow/wf-1", {"name": "workflow"}) + + assert await Storage.clear() == 2 + assert await Storage.read("project/proj-1") is None + assert await Storage.read("workflow/wf-1") is None + + +@pytest.mark.asyncio +async def test_list_without_prefix_merges_flocks_and_workflow_dbs() -> None: + await Storage.init() + + await Storage.write("project/proj-1", {"name": "project"}) + await Storage.write("workflow/wf-1", {"name": "workflow"}) + + keys = await Storage.list_keys() + entries = dict(await Storage.list_entries()) + raw_entries = dict(await Storage.list_raw()) + + assert {"project/proj-1", "workflow/wf-1"}.issubset(set(keys)) + assert entries["project/proj-1"] == {"name": "project"} + assert entries["workflow/wf-1"] == {"name": "workflow"} + assert raw_entries["project/proj-1"] == '{"name": "project"}' + assert raw_entries["workflow/wf-1"] == '{"name": "workflow"}' + + +@pytest.mark.asyncio +async def test_workflow_kv_migrates_from_legacy_flocks_db() -> None: + flocks_db = Config.get_data_path() / "flocks.db" + flocks_db.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(flocks_db) + try: + conn.execute( + """ + CREATE TABLE storage ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute( + "INSERT INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ("workflow_registry/wf-legacy", '{"ok": true}', "json", "old", "old"), + ) + conn.execute( + "INSERT INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ("session:legacy", '{"ok": false}', "json", "old", "old"), + ) + conn.commit() + finally: + conn.close() + + await Storage.init() + + workflow_db = Storage.get_workflow_db_path() + assert _fetch_storage_value(workflow_db, "workflow_registry/wf-legacy") == '{"ok": true}' + assert _fetch_storage_value(flocks_db, "workflow_registry/wf-legacy") == '{"ok": true}' + assert _fetch_storage_value(workflow_db, "session:legacy") is None + marker = await Storage.get(Storage._multi_db_migration_marker_key) + assert marker["workflow_migrated"] is True + assert marker["workflow_rows"] == 1 + assert marker["workflow_source_rows_deleted"] == 0 + + +@pytest.mark.asyncio +async def test_workflow_prefix_migration_treats_underscore_literally() -> None: + flocks_db = Config.get_data_path() / "flocks.db" + flocks_db.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(flocks_db) + try: + conn.execute( + """ + CREATE TABLE storage ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute( + "INSERT INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ("workflow_registry/wf-ok", '{"ok": true}', "json", "old", "old"), + ) + conn.execute( + "INSERT INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ("workflowXregistry/wf-bad", '{"bad": true}', "json", "old", "old"), + ) + conn.commit() + finally: + conn.close() + + await Storage.init() + + workflow_db = Storage.get_workflow_db_path() + assert _fetch_storage_value(workflow_db, "workflow_registry/wf-ok") == '{"ok": true}' + assert _fetch_storage_value(workflow_db, "workflowXregistry/wf-bad") is None + assert _fetch_storage_value(flocks_db, "workflow_registry/wf-ok") == '{"ok": true}' + assert _fetch_storage_value(flocks_db, "workflowXregistry/wf-bad") == '{"bad": true}' + assert await Storage.list_keys("workflow_registry/") == ["workflow_registry/wf-ok"] + + +@pytest.mark.asyncio +async def test_completed_workflow_migration_fails_if_workflow_db_disappears() -> None: + flocks_db = Config.get_data_path() / "flocks.db" + flocks_db.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(flocks_db) + try: + conn.execute( + """ + CREATE TABLE storage ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """ + ) + conn.execute( + "INSERT INTO storage (key, value, type, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ("workflow/wf-legacy", '{"ok": true}', "json", "old", "old"), + ) + conn.commit() + finally: + conn.close() + + await Storage.init() + workflow_db = Storage.get_workflow_db_path() + assert workflow_db.exists() + workflow_db.unlink() + _reset_state() + + with pytest.raises(StorageError, match="workflow.db is missing"): + await Storage.init() + + +@pytest.mark.asyncio +async def test_task_store_uses_tasks_db_and_migrates_existing_task_tables() -> None: + await Storage.init() + flocks_db = Storage.get_db_path() + conn = sqlite3.connect(flocks_db) + try: + conn.executescript(_TASKS_DDL) + legacy_scheduler = TaskScheduler(title="legacy task") + conn.execute( + """ + INSERT INTO task_schedulers + (id, title, description, mode, status, priority, source, trigger, + execution_mode, agent_name, workflow_id, skills, category, context, + workspace_directory, retry, tags, created_at, updated_at, created_by, dedup_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + TaskStore._scheduler_to_row(legacy_scheduler), + ) + legacy_orphan_execution = TaskExecution( + schedulerID="missing-legacy-scheduler", + title="legacy orphan execution", + ) + legacy_non_utf8_execution = TaskExecution( + schedulerID=legacy_scheduler.id, + title="legacy non-utf8 execution", + ) + conn.execute( + """ + INSERT INTO task_executions + (id, scheduler_id, title, description, priority, source, trigger_type, + status, delivery_status, queued_at, started_at, completed_at, duration_ms, + session_id, result_summary, error, execution_input_snapshot, + workspace_directory, retry, execution_mode, agent_name, workflow_id, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + TaskStore._execution_to_row(legacy_orphan_execution), + ) + conn.execute( + """ + INSERT INTO task_executions + (id, scheduler_id, title, description, priority, source, trigger_type, + status, delivery_status, queued_at, started_at, completed_at, duration_ms, + session_id, result_summary, error, execution_input_snapshot, + workspace_directory, retry, execution_mode, agent_name, workflow_id, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + TaskStore._execution_to_row(legacy_non_utf8_execution), + ) + conn.execute( + "UPDATE task_executions SET description = CAST(? AS TEXT) WHERE id = ?", + ("扫描 Windows 主机 192.168.254.1".encode("gbk"), legacy_non_utf8_execution.id), + ) + conn.commit() + finally: + conn.close() + + await TaskStore.init() + + tasks_db = TaskStore.get_db_path() + conn = sqlite3.connect(tasks_db) + try: + row = conn.execute( + "SELECT title FROM task_schedulers WHERE id = ?", + (legacy_scheduler.id,), + ).fetchone() + orphan_row = conn.execute( + "SELECT title FROM task_executions WHERE id = ?", + (legacy_orphan_execution.id,), + ).fetchone() + non_utf8_row = conn.execute( + "SELECT description FROM task_executions WHERE id = ?", + (legacy_non_utf8_execution.id,), + ).fetchone() + finally: + conn.close() + assert row[0] == "legacy task" + assert orphan_row[0] == "legacy orphan execution" + assert non_utf8_row[0] == "扫描 Windows 主机 192.168.254.1" + + new_scheduler = TaskScheduler(title="new task") + await TaskStore.create_scheduler(new_scheduler) + assert await TaskStore.get_scheduler(new_scheduler.id) is not None + + conn = sqlite3.connect(flocks_db) + try: + row = conn.execute( + "SELECT 1 FROM task_schedulers WHERE id = ?", + (new_scheduler.id,), + ).fetchone() + finally: + conn.close() + assert row is None + + marker = await Storage.get(Storage._multi_db_migration_marker_key) + assert marker["tasks_migrated"] is True + assert marker["task_rows"] == 3 + assert marker["task_foreign_key_violations"] == 1 + assert marker["task_source_rows_deleted"] == 0 + assert _fetch_table_count(flocks_db, "task_schedulers") == 1 + assert _fetch_table_count(flocks_db, "task_executions") == 2 + + await TaskStore.close() + TaskStore._initialized = False + await TaskStore.init() + conn = sqlite3.connect(tasks_db) + try: + count = conn.execute("SELECT COUNT(*) FROM task_schedulers").fetchone()[0] + finally: + conn.close() + assert count == 2 + + +@pytest.mark.asyncio +async def test_completed_task_migration_fails_if_tasks_db_disappears() -> None: + await Storage.init() + flocks_db = Storage.get_db_path() + conn = sqlite3.connect(flocks_db) + try: + conn.executescript(_TASKS_DDL) + legacy_scheduler = TaskScheduler(title="legacy task") + conn.execute( + """ + INSERT INTO task_schedulers + (id, title, description, mode, status, priority, source, trigger, + execution_mode, agent_name, workflow_id, skills, category, context, + workspace_directory, retry, tags, created_at, updated_at, created_by, dedup_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + TaskStore._scheduler_to_row(legacy_scheduler), + ) + conn.commit() + finally: + conn.close() + + await TaskStore.init() + tasks_db = TaskStore.get_db_path() + assert tasks_db.exists() + await TaskStore.close() + tasks_db.unlink() + _reset_state() + + with pytest.raises(RuntimeError, match="tasks.db is missing"): + await TaskStore.init() + + +@pytest.mark.asyncio +async def test_task_store_init_failure_clears_half_open_connection( + monkeypatch: pytest.MonkeyPatch, +) -> None: + await Storage.init() + + async def fail_migration(*_args, **_kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr( + TaskStore, + "_migrate_task_tables_to_tasks_db", + classmethod(fail_migration), + ) + + with pytest.raises(RuntimeError, match="boom"): + await TaskStore.init() + + assert TaskStore._conn is None + assert TaskStore._initialized is False diff --git a/tests/task/test_task.py b/tests/task/test_task.py index fb4cbbfee..f2b31aad7 100644 --- a/tests/task/test_task.py +++ b/tests/task/test_task.py @@ -94,6 +94,39 @@ async def test_immediate_scheduler_creates_single_queued_execution(tmp_path: Pat assert execution.completed_at is None +@pytest.mark.asyncio +async def test_list_executions_tolerates_legacy_non_utf8_description( + tmp_path: Path, +): + legacy_description = "扫描 Windows 主机 192.168.254.1" + scheduler = await TaskManager.create_scheduler( + title="legacy encoding", + description="placeholder", + mode=SchedulerMode.ONCE, + priority=TaskPriority.NORMAL, + trigger=TaskTrigger(run_immediately=True), + workspace_directory=str(tmp_path / "workspace"), + ) + executions, _ = await TaskManager.list_scheduler_executions(scheduler.id) + execution_id = executions[0].id + + db = await TaskStore.raw_db() + await db.execute( + """ + UPDATE task_executions + SET description = CAST(? AS TEXT) + WHERE id = ? + """, + (legacy_description.encode("gbk"), execution_id), + ) + await db.commit() + + executions, total = await TaskManager.list_executions(limit=20) + + assert total == 1 + assert executions[0].description == legacy_description + + @pytest.mark.asyncio async def test_once_scheduler_tick_creates_execution_and_disables_scheduler(tmp_path: Path): scheduler = await TaskManager.create_scheduler( diff --git a/tests/tool/test_device_context_prompt.py b/tests/tool/test_device_context_prompt.py index ba1a420a5..487f9d817 100644 --- a/tests/tool/test_device_context_prompt.py +++ b/tests/tool/test_device_context_prompt.py @@ -94,6 +94,7 @@ async def test_device_context_deduplicates_tool_sets_and_references_them_from_de content = await build_device_context_section() assert content is not None + assert "设备不存在时,请提醒用户前往设备接入页面添加设备。" in content assert "工具名和描述:" in content assert "工具能力:" not in content assert "action 可选:" not in content diff --git a/tests/tool/test_question_handler.py b/tests/tool/test_question_handler.py new file mode 100644 index 000000000..75b7335ac --- /dev/null +++ b/tests/tool/test_question_handler.py @@ -0,0 +1,49 @@ +from unittest.mock import AsyncMock + +import pytest + + +@pytest.mark.asyncio +async def test_api_question_handler_publishes_rejected_event_on_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from flocks.server.routes import event as event_routes + from flocks.server.routes.question import list_question_requests + from flocks.tool import question_handler + from flocks.utils.id import Identifier + + events: list[tuple[str, dict]] = [] + + async def fake_publish_event(event_type: str, payload: dict) -> None: + events.append((event_type, payload)) + + monkeypatch.setattr(Identifier, "ascending", lambda _prefix: "question_timeout_req") + monkeypatch.setattr(question_handler.asyncio, "sleep", AsyncMock()) + monkeypatch.setattr(event_routes, "publish_event", fake_publish_event) + + with pytest.raises(TimeoutError): + await question_handler.api_question_handler( + "ses_question_timeout", + [{"question": "Continue?", "type": "confirm"}], + ) + + assert ("question.asked", { + "id": "question_timeout_req", + "sessionID": "ses_question_timeout", + "questions": [{ + "question": "Continue?", + "header": "", + "type": "confirm", + "options": [], + "multiple": False, + "placeholder": "", + "multiline": False, + "custom": True, + }], + }) in events + assert ("question.rejected", { + "sessionID": "ses_question_timeout", + "requestID": "question_timeout_req", + "reason": "timeout", + }) in events + assert list_question_requests("ses_question_timeout") == [] diff --git a/tests/updater/test_updater.py b/tests/updater/test_updater.py index 056ddd7b1..1a176274c 100644 --- a/tests/updater/test_updater.py +++ b/tests/updater/test_updater.py @@ -2109,6 +2109,46 @@ async def fake_run_async(cmd, cwd=None, timeout=None, env=None): ] +@pytest.mark.asyncio +async def test_build_frontend_workspace_tolerates_windows_node_assertion_after_build( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + webui_dir = tmp_path / "webui" + webui_dir.mkdir() + (webui_dir / "package.json").write_text("{}", encoding="utf-8") + bundled_npm = str(tmp_path / "npm.cmd") + run_calls: list[list[str]] = [] + + async def fake_run_async(cmd, cwd=None, timeout=None, env=None): + run_calls.append(list(cmd)) + if cmd == [bundled_npm, "install"]: + return 0, "", "" + if cmd == [bundled_npm, "run", "build"]: + dist_dir = webui_dir / "dist" + dist_dir.mkdir() + (dist_dir / "index.html").write_text("", encoding="utf-8") + return ( + 3221226505, + "built in 6.83s", + "Assertion failed: !(handle->flags & UV_HANDLE_CLOSING), file src\\win\\async.c, line 76", + ) + raise AssertionError(f"unexpected command: {cmd}") + + monkeypatch.setattr(updater.sys, "platform", "win32") + monkeypatch.setattr( + updater, + "_resolve_frontend_npm_candidates", + lambda *, npm_registry=None: [ + updater._FrontendNpmCandidate(npm=bundled_npm, env=None, source="bundled"), + ], + ) + monkeypatch.setattr(updater, "_run_async", fake_run_async) + + assert await updater._build_frontend_workspace(webui_dir) is None + assert run_calls == [[bundled_npm, "install"], [bundled_npm, "run", "build"]] + + @pytest.mark.asyncio async def test_perform_update_retries_windows_frontend_with_full_timeout_after_bundled_install_and_ci_timeout( monkeypatch: pytest.MonkeyPatch, diff --git a/tests/workflow/test_execution_store_compact.py b/tests/workflow/test_execution_store_compact.py index 1707280de..9e95f2813 100644 --- a/tests/workflow/test_execution_store_compact.py +++ b/tests/workflow/test_execution_store_compact.py @@ -16,8 +16,6 @@ """ from __future__ import annotations - -import json from typing import Any, Dict, List from unittest.mock import AsyncMock, patch @@ -31,6 +29,8 @@ compact_outputs_for_storage, compact_step_for_storage, record_execution_result, + workflow_execution_index_key, + workflow_execution_step_prefix, workflow_execution_step_key, ) from flocks.storage.storage import Storage @@ -307,6 +307,7 @@ def raise_create_task(coro, *args, **kwargs): # noqa: ANN001, ARG001 assert write_calls[2].args[0] == "workflow_execution/exec-1" assert write_calls[2].args[1]["executionLog"] == [] assert write_calls[2].args[1]["stepCount"] == 2 + assert write_calls[3].args[0].startswith("workflow_execution_index/wf/") def test_compact_history_compacts_each_step_inputs() -> None: @@ -397,14 +398,16 @@ async def test_trim_execution_history_keeps_only_30_and_deletes_matching_jsonl( tmp_path, ) -> None: workflow_id = "wf-trim" - entries = [] + indexed_rows = [] for idx in range(32): exec_id = f"exec-{idx:02d}" - entries.append(( - f"workflow_execution/{exec_id}", + index_key = workflow_execution_index_key(workflow_id, idx, exec_id) + indexed_rows.append(( + index_key, { - "id": exec_id, "workflowId": workflow_id, + "execId": exec_id, + "executionKey": f"workflow_execution/{exec_id}", "startedAt": idx, }, )) @@ -412,34 +415,101 @@ async def test_trim_execution_history_keeps_only_30_and_deletes_matching_jsonl( workflow_record.parent.mkdir(parents=True, exist_ok=True) workflow_record.write_text('{"type":"workflow.summary"}\n', encoding="utf-8") - # Another workflow's record should be ignored entirely. - entries.append(( - "workflow_execution/other-exec", - {"id": "other-exec", "workflowId": "wf-other", "startedAt": 0}, - )) + # Another workflow's record should be ignored entirely because the trim + # only reads workflow_execution_index//. other_record = tmp_path / "workflow" / "other-exec.jsonl" other_record.parent.mkdir(parents=True, exist_ok=True) other_record.write_text('{"type":"workflow.summary"}\n', encoding="utf-8") remove_mock = AsyncMock(return_value=None) - raw_entries = [(key, json.dumps(value)) for key, value in entries] - - async def list_raw_side_effect(prefix: str): - if prefix == "workflow_execution/": - return raw_entries - return [] + clear_mock = AsyncMock(return_value=2) + list_raw_mock = AsyncMock(return_value=[]) - with patch.object(Storage, "list_raw", AsyncMock(side_effect=list_raw_side_effect)), \ + with patch.object(Storage, "list_entries", AsyncMock(return_value=indexed_rows)), \ + patch.object(Storage, "list_raw", list_raw_mock), \ + patch.object(Storage, "clear", clear_mock), \ patch.object(Storage, "remove", remove_mock), \ patch("flocks.session.recorder._record_dir", return_value=tmp_path): await _trim_execution_history(workflow_id) + list_raw_mock.assert_not_awaited() removed_keys = [call.args[0] for call in remove_mock.await_args_list] - assert removed_keys == [ - "workflow_execution/exec-00", - "workflow_execution/exec-01", + assert "workflow_execution/exec-00" in removed_keys + assert "workflow_execution/exec-01" in removed_keys + assert workflow_execution_index_key(workflow_id, 0, "exec-00") in removed_keys + assert workflow_execution_index_key(workflow_id, 1, "exec-01") in removed_keys + cleared_prefixes = [call.args[0] for call in clear_mock.await_args_list] + assert cleared_prefixes == [ + workflow_execution_step_prefix("exec-00"), + workflow_execution_step_prefix("exec-01"), ] assert not (tmp_path / "workflow" / "exec-00.jsonl").exists() assert not (tmp_path / "workflow" / "exec-01.jsonl").exists() assert (tmp_path / "workflow" / "exec-02.jsonl").exists() assert other_record.exists() + + +@pytest.mark.asyncio +async def test_trim_execution_history_uses_index_without_full_scan(tmp_path) -> None: + workflow_id = "wf-indexed" + indexed_rows = [] + for idx in range(32): + exec_id = f"exec-{idx:02d}" + index_key = workflow_execution_index_key(workflow_id, idx, exec_id) + indexed_rows.append(( + index_key, + { + "workflowId": workflow_id, + "execId": exec_id, + "executionKey": f"workflow_execution/{exec_id}", + "startedAt": idx, + }, + )) + workflow_record = tmp_path / "workflow" / f"{exec_id}.jsonl" + workflow_record.parent.mkdir(parents=True, exist_ok=True) + workflow_record.write_text('{"type":"workflow.summary"}\n', encoding="utf-8") + + list_raw_mock = AsyncMock(return_value=[]) + clear_mock = AsyncMock(return_value=1) + remove_mock = AsyncMock(return_value=True) + + with patch.object(Storage, "list_entries", AsyncMock(return_value=indexed_rows)), \ + patch.object(Storage, "list_raw", list_raw_mock), \ + patch.object(Storage, "clear", clear_mock), \ + patch.object(Storage, "remove", remove_mock), \ + patch("flocks.session.recorder._record_dir", return_value=tmp_path): + await _trim_execution_history(workflow_id) + + list_raw_mock.assert_not_awaited() + assert [call.args[0] for call in clear_mock.await_args_list] == [ + workflow_execution_step_prefix("exec-00"), + workflow_execution_step_prefix("exec-01"), + ] + removed = [call.args[0] for call in remove_mock.await_args_list] + assert "workflow_execution/exec-00" in removed + assert workflow_execution_index_key(workflow_id, 0, "exec-00") in removed + + +@pytest.mark.asyncio +async def test_trim_execution_history_surfaces_delete_failures() -> None: + workflow_id = "wf-trim-fail" + indexed_rows = [ + ( + workflow_execution_index_key(workflow_id, idx, f"exec-{idx:02d}"), + { + "workflowId": workflow_id, + "execId": f"exec-{idx:02d}", + "executionKey": f"workflow_execution/exec-{idx:02d}", + "startedAt": idx, + }, + ) + for idx in range(31) + ] + list_raw_mock = AsyncMock(return_value=[]) + + with patch.object(Storage, "list_entries", AsyncMock(return_value=indexed_rows)), \ + patch.object(Storage, "list_raw", list_raw_mock), \ + patch.object(Storage, "clear", AsyncMock(side_effect=RuntimeError("locked"))): + with pytest.raises(RuntimeError, match="Failed to trim workflow execution history"): + await _trim_execution_history(workflow_id) + list_raw_mock.assert_not_awaited() diff --git a/uv.lock b/uv.lock index 62cd1f857..3be641a1f 100644 --- a/uv.lock +++ b/uv.lock @@ -537,7 +537,7 @@ wheels = [ [[package]] name = "flocks" -version = "2026.6.18" +version = "2026.6.24" source = { editable = "." } dependencies = [ { name = "aiofiles" }, diff --git a/webui/src/components/common/EntitySheet.tsx b/webui/src/components/common/EntitySheet.tsx index d999bc1fb..802631221 100644 --- a/webui/src/components/common/EntitySheet.tsx +++ b/webui/src/components/common/EntitySheet.tsx @@ -866,7 +866,7 @@ function SheetTab({ return ( {opt.custom && selected && ( -
+
{multiple && answer.length > 0 && ( -
+
{t('question.selectedCount', { count: answer.length })}
)} @@ -320,7 +320,7 @@ function TextInput({ compact: boolean; }) { const { t } = useTranslation('common'); - const cls = `w-full border border-gray-300 rounded-lg px-3 py-2 ${compact ? 'text-xs' : 'text-sm'} text-gray-900 placeholder-gray-400 focus:border-purple-500 focus:ring-2 focus:ring-purple-100 outline-none`; + const cls = `w-full border border-zinc-300 rounded-lg px-3 py-2 ${compact ? 'text-xs' : 'text-sm'} text-zinc-900 placeholder-zinc-400 outline-none focus:border-emerald-500 focus:ring-2 focus:ring-emerald-100`; if (q.multiline) { return (