diff --git a/application/execution_service.py b/application/execution_service.py index 2e46eaa..e50230b 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -1012,7 +1012,7 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): continue can_buy_value = min(diff, investable_cash) if can_buy_value > price: - is_limit_order = symbol in limit_order_symbols + is_limit_order = symbol in limit_order_symbols or symbol == cash_sweep_symbol limit_order_kind = "limit" if is_limit_order else "market" limit_ref_price = round(price * limit_buy_premium, 2) if is_limit_order else round(price, 2) limit_candidate = _estimate_buy_quantity_candidate( @@ -1135,7 +1135,22 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): float(safe_haven_cash_substitute_threshold_usd or 0.0), ) if substitution_threshold <= 0.0 or investable_cash >= substitution_threshold: - quantity = int(investable_cash // cash_sweep_price) + ref_price = round(cash_sweep_price * limit_buy_premium, 2) + budget_quantity = floor_to_quantity_step(investable_cash / ref_price, 1.0) + cash_limit_quantity = estimate_cash_buy_quantity_safe( + trade_context, + market_symbol(cash_sweep_symbol), + "limit", + ref_price, + estimate_max_purchase_quantity=estimate_max_purchase_quantity, + notify_issue=notify_issue, + ) + if cash_limit_quantity is None: + quantity = 0 + else: + quantity = _normalize_trade_quantity( + min(budget_quantity, float(cash_limit_quantity)), + ) else: quantity = 0 if quantity > 0: @@ -1145,32 +1160,43 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): market_symbol(cash_sweep_symbol), "buy", quantity_text, - round(cash_sweep_price, 2), - order_type="market", + ref_price, + order_type="limit", ) else: submitted = submit_order_via_port( market_symbol(cash_sweep_symbol), - "market", + "limit", "buy", quantity, translator( - "market_buy", + "limit_buy", symbol=cash_sweep_symbol, qty=quantity_text, - price=round(cash_sweep_price, 2), + price=ref_price, ), + submitted_price=ref_price, ) if submitted: rebuy_message = translator( "cash_sweep_rebuy", symbol=market_symbol(cash_sweep_symbol), qty=quantity_text, - price=f"{cash_sweep_price:.2f}", + price=f"{ref_price:.2f}", ) note_logs.append(rebuy_message) print(with_prefix(rebuy_message), flush=True) action_done = True + elif substitution_threshold <= 0.0 or investable_cash >= substitution_threshold: + record_note_log( + note_logs, + translator=translator, + with_prefix=with_prefix, + kind="buy_deferred_cash_sweep_cash_limit", + symbol=market_symbol(cash_sweep_symbol), + investable=f"{investable_cash:.2f}", + budget_qty=format_quantity(budget_quantity), + ) return ExecutionCycleResult( plan=dict(plan or {}), diff --git a/application/execution_state.py b/application/execution_state.py new file mode 100644 index 0000000..9c34c45 --- /dev/null +++ b/application/execution_state.py @@ -0,0 +1,293 @@ +"""Execution marker storage for duplicate-run suppression.""" + +from __future__ import annotations + +import json +import re +import tempfile +from collections.abc import Callable, Mapping +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +DEFAULT_EXECUTION_STATE_DIR = "/tmp/longbridge_execution_state" +DEFAULT_EXECUTION_STATE_NAMESPACE = "execution_markers" + + +def _first_non_empty(*values: object) -> str: + for value in values: + text = str(value or "").strip() + if text: + return text + return "" + + +def _env_bool(value: object, *, default: bool) -> bool: + text = str(value or "").strip().lower() + if not text: + return default + if text in {"1", "true", "t", "yes", "y", "on"}: + return True + if text in {"0", "false", "f", "no", "n", "off"}: + return False + return default + + +def _parse_gcs_uri(uri: str) -> tuple[str, str]: + text = str(uri or "").strip() + if not text.startswith("gs://"): + raise ValueError(f"gcs uri must start with gs://, got: {uri!r}") + remainder = text[5:] + bucket, _, prefix = remainder.partition("/") + if not bucket: + raise ValueError(f"gcs uri must include a bucket, got: {uri!r}") + return bucket, prefix.strip("/") + + +def _clean_key_part(value: object, *, fallback: str) -> str: + text = str(value or "").strip().lower() + text = re.sub(r"[^a-z0-9._=-]+", "-", text) + text = re.sub(r"-{2,}", "-", text).strip("-.") + return text or fallback + + +def _clean_relative_key(key: str) -> str: + parts = [ + _clean_key_part(part, fallback="unknown") + for part in str(key or "").replace("\\", "/").split("/") + if str(part or "").strip() + ] + return "/".join(parts) or "unknown" + + +def build_execution_marker_key( + *, + platform: str, + strategy_profile: str, + account_scope: str, + execution_mode: str, + signal_date: object, + effective_date: object, + execution_timing_contract: object = None, +) -> str: + """Build a stable marker key for one strategy signal execution.""" + + signal = _first_non_empty(signal_date) + effective = _first_non_empty(effective_date) + if not signal and not effective: + return "" + return "/".join( + ( + "v1", + _clean_key_part(platform, fallback="platform"), + _clean_key_part(account_scope, fallback="account"), + _clean_key_part(strategy_profile, fallback="strategy"), + _clean_key_part(execution_mode, fallback="mode"), + _clean_key_part(signal or "no-signal-date", fallback="signal"), + _clean_key_part(effective or "no-effective-date", fallback="effective"), + _clean_key_part(execution_timing_contract or "no-contract", fallback="contract"), + ) + ) + + +@dataclass(frozen=True) +class ExecutionMarkerStore: + local_dir: str | Path | None = DEFAULT_EXECUTION_STATE_DIR + gcs_prefix_uri: str | None = None + gcp_project_id: str | None = None + namespace: str = DEFAULT_EXECUTION_STATE_NAMESPACE + client_factory: Any = None + prior_report_scan_limit: int = 100 + + def has_marker(self, marker_key: str) -> bool: + if not str(marker_key or "").strip(): + return False + if self.gcs_prefix_uri and self._gcs_blob(marker_key).exists(): + return True + if self.local_dir and self._local_path(marker_key).exists(): + return True + return False + + def record_marker( + self, + marker_key: str, + *, + metadata: Mapping[str, Any] | None = None, + ) -> None: + if not str(marker_key or "").strip(): + return + payload = { + "schema_version": "longbridge_execution_marker.v1", + "marker_key": str(marker_key), + "recorded_at": datetime.now(timezone.utc).isoformat(), + "metadata": dict(metadata or {}), + } + encoded = json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + if self.gcs_prefix_uri: + self._gcs_blob(marker_key).upload_from_string( + encoded, + content_type="application/json", + ) + return + if self.local_dir: + path = self._local_path(marker_key) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(encoded, encoding="utf-8") + + def has_prior_execution_report( + self, + *, + platform: str, + strategy_profile: str, + account_scope: str, + signal_date: object, + effective_date: object, + dry_run_only: bool, + ) -> bool: + if not self.gcs_prefix_uri: + return False + signal = _first_non_empty(signal_date) + effective = _first_non_empty(effective_date) + if not signal and not effective: + return False + bucket_name, prefix = _parse_gcs_uri(str(self.gcs_prefix_uri or "")) + object_prefix = "/".join( + part.strip("/") + for part in ( + prefix, + _runtime_report_segment(platform), + _runtime_report_segment(strategy_profile), + _runtime_report_segment(account_scope), + ) + if part and part.strip("/") + ) + client = self._gcs_client() + scanned = 0 + for blob in client.list_blobs(bucket_name, prefix=object_prefix): + name = str(getattr(blob, "name", "") or "") + if not name.endswith(".json"): + continue + scanned += 1 + if scanned > max(1, int(self.prior_report_scan_limit or 1)): + break + try: + payload = json.loads(blob.download_as_text()) + except Exception: + continue + if _report_matches_execution( + payload, + platform=platform, + strategy_profile=strategy_profile, + account_scope=account_scope, + signal_date=signal, + effective_date=effective, + dry_run_only=dry_run_only, + ): + return True + return False + + def _local_path(self, marker_key: str) -> Path: + root = Path(self.local_dir or tempfile.gettempdir()).expanduser() + return root / self.namespace / f"{_clean_relative_key(marker_key)}.json" + + def _gcs_blob(self, marker_key: str): + bucket_name, prefix = _parse_gcs_uri(str(self.gcs_prefix_uri or "")) + object_name = "/".join( + part.strip("/") + for part in ( + prefix, + self.namespace, + f"{_clean_relative_key(marker_key)}.json", + ) + if part and part.strip("/") + ) + if self.client_factory is None: + try: + from google.cloud import storage # type: ignore + except ImportError as exc: + raise RuntimeError("google-cloud-storage is required for GCS execution markers") from exc + client_factory = storage.Client + else: + client_factory = self.client_factory + client = client_factory(project=self.gcp_project_id) if self.gcp_project_id else client_factory() + return client.bucket(bucket_name).blob(object_name) + + def _gcs_client(self): + if self.client_factory is None: + try: + from google.cloud import storage # type: ignore + except ImportError as exc: + raise RuntimeError("google-cloud-storage is required for GCS execution markers") from exc + client_factory = storage.Client + else: + client_factory = self.client_factory + return client_factory(project=self.gcp_project_id) if self.gcp_project_id else client_factory() + + +def build_execution_marker_store_from_env( + *, + env_reader: Callable[[str, str | None], str | None], + gcp_project_id: str | None = None, + client_factory: Any = None, +) -> ExecutionMarkerStore: + explicit_gcs_uri = env_reader("LONGBRIDGE_EXECUTION_STATE_GCS_URI", None) + report_gcs_uri = env_reader("EXECUTION_REPORT_GCS_URI", None) + local_dir = env_reader("LONGBRIDGE_EXECUTION_STATE_DIR", None) + return ExecutionMarkerStore( + local_dir=local_dir or DEFAULT_EXECUTION_STATE_DIR, + gcs_prefix_uri=explicit_gcs_uri or report_gcs_uri, + gcp_project_id=gcp_project_id, + client_factory=client_factory, + ) + + +def resolve_execution_dedup_enabled( + *, + env_reader: Callable[[str, str | None], str | None], + dry_run_only: bool, +) -> bool: + raw_value = env_reader("LONGBRIDGE_EXECUTION_DEDUP_ENABLED", None) + return _env_bool(raw_value, default=bool(dry_run_only)) + + +def _runtime_report_segment(value: object) -> str: + text = str(value or "").strip() + safe = "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text) + return safe or "unknown" + + +def _optional_str(value: object) -> str: + return str(value or "").strip() + + +def _report_matches_execution( + payload: Mapping[str, Any], + *, + platform: str, + strategy_profile: str, + account_scope: str, + signal_date: str, + effective_date: str, + dry_run_only: bool, +) -> bool: + report = dict(payload or {}) + if _optional_str(report.get("platform")).lower() != _optional_str(platform).lower(): + return False + if _optional_str(report.get("strategy_profile")).lower() != _optional_str(strategy_profile).lower(): + return False + if _optional_str(report.get("account_scope")).lower() != _optional_str(account_scope).lower(): + return False + if bool(report.get("dry_run")) != bool(dry_run_only): + return False + summary = dict(report.get("summary") or {}) + if signal_date and _optional_str(summary.get("signal_date")) != signal_date: + return False + if effective_date and _optional_str(summary.get("effective_date")) != effective_date: + return False + return ( + bool(summary.get("action_done")) + or int(float(summary.get("orders_previewed_count") or 0)) > 0 + or int(float(summary.get("order_events_count") or 0)) > 0 + ) diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 80d9e15..aa9eb30 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -5,7 +5,8 @@ import re from datetime import datetime -from application.execution_service import execute_rebalance_cycle +from application.execution_service import ExecutionCycleResult, execute_rebalance_cycle +from application.execution_state import build_execution_marker_key from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime from application.signal_snapshot import build_signal_snapshot from notifications.events import NotificationPublisher @@ -160,6 +161,71 @@ def _append_strategy_line(lines, *, strategy_display_name, translator): _append_status_lines = notification_renderers._append_status_lines +def _build_execution_marker_key(*, config: LongBridgeRebalanceConfig, execution: dict) -> str: + if not getattr(config, "execution_dedup_enabled", False): + return "" + execution_mode = "paper" if bool(getattr(config, "dry_run_only", False)) else "live" + return build_execution_marker_key( + platform="longbridge", + strategy_profile=getattr(config, "strategy_profile", "") or "unknown", + account_scope=getattr(config, "execution_state_account_scope", "") or "unknown", + execution_mode=execution_mode, + signal_date=execution.get("signal_date"), + effective_date=execution.get("effective_date"), + execution_timing_contract=execution.get("execution_timing_contract"), + ) + + +def _execution_already_recorded_message(*, config: LongBridgeRebalanceConfig, marker_key: str, execution: dict) -> str: + message = config.translator( + "execution_already_recorded", + marker=marker_key, + signal_date=str(execution.get("signal_date") or ""), + effective_date=str(execution.get("effective_date") or ""), + ) + if not message or message == "execution_already_recorded": + message = f"Execution already recorded for signal={execution.get('signal_date')} effective={execution.get('effective_date')}" + return message + + +def _should_record_execution_marker(*, result: ExecutionCycleResult, config: LongBridgeRebalanceConfig) -> bool: + if not getattr(config, "execution_dedup_enabled", False): + return False + if bool(getattr(config, "dry_run_only", False)) and tuple(getattr(result, "dry_run_orders", ()) or ()): + return True + return bool(getattr(result, "action_done", False)) + + +def _record_execution_marker( + *, + config: LongBridgeRebalanceConfig, + marker_key: str, + result: ExecutionCycleResult, + notify_issue, +) -> None: + store = getattr(config, "execution_state_store", None) + if not store or not marker_key: + return + try: + store.record_marker( + marker_key, + metadata={ + "strategy_profile": getattr(config, "strategy_profile", ""), + "account_scope": getattr(config, "execution_state_account_scope", ""), + "dry_run_only": bool(getattr(config, "dry_run_only", False)), + "action_done": bool(getattr(result, "action_done", False)), + "dry_run_orders_count": len(tuple(getattr(result, "dry_run_orders", ()) or ())), + "signal_date": str(dict(getattr(result, "execution", {}) or {}).get("signal_date") or ""), + "effective_date": str(dict(getattr(result, "execution", {}) or {}).get("effective_date") or ""), + }, + ) + except Exception as exc: + notify_issue( + "Execution marker write failed", + f"Marker: {marker_key}\n{type(exc).__name__}: {exc}", + ) + + def run_strategy( *, @@ -200,29 +266,83 @@ def fetch_replanned_state(): plan, portfolio, execution, allocation = fetch_replanned_state() - execution_result = execute_rebalance_cycle( - trade_context=trade_context, - plan=plan, - portfolio=portfolio, - execution=execution, - allocation=allocation, - fetch_replanned_state=fetch_replanned_state, - market_data_port=market_data_port, - estimate_max_purchase_quantity=runtime.estimate_max_purchase_quantity, - execution_port=execution_port, - post_submit_order=runtime.post_submit_order, - notify_issue=runtime.notify_issue, - translator=config.translator, - with_prefix=config.with_prefix, - limit_sell_discount=config.limit_sell_discount, - limit_buy_premium=config.limit_buy_premium, - dry_run_only=config.dry_run_only, - symbol_suffix=config.symbol_suffix, - post_sell_refresh_attempts=config.post_sell_refresh_attempts, - post_sell_refresh_interval_sec=config.post_sell_refresh_interval_sec, - sleeper=config.sleeper or _noop_sleep, - safe_haven_cash_substitute_threshold_usd=config.safe_haven_cash_substitute_threshold_usd, - ) + execution_marker_key = _build_execution_marker_key(config=config, execution=execution) + execution_state_store = getattr(config, "execution_state_store", None) + execution_already_recorded = False + if execution_marker_key and execution_state_store: + try: + execution_already_recorded = bool(execution_state_store.has_marker(execution_marker_key)) + except Exception as exc: + runtime.notify_issue( + "Execution marker read failed", + f"Marker: {execution_marker_key}\n{type(exc).__name__}: {exc}", + ) + if not execution_already_recorded and hasattr(execution_state_store, "has_prior_execution_report"): + try: + execution_already_recorded = bool( + execution_state_store.has_prior_execution_report( + platform="longbridge", + strategy_profile=getattr(config, "strategy_profile", "") or "unknown", + account_scope=getattr(config, "execution_state_account_scope", "") or "unknown", + signal_date=execution.get("signal_date"), + effective_date=execution.get("effective_date"), + dry_run_only=bool(getattr(config, "dry_run_only", False)), + ) + ) + except Exception as exc: + runtime.notify_issue( + "Execution report dedup read failed", + f"Marker: {execution_marker_key}\n{type(exc).__name__}: {exc}", + ) + + if execution_already_recorded: + message = _execution_already_recorded_message( + config=config, + marker_key=execution_marker_key, + execution=execution, + ) + print(config.with_prefix(message), flush=True) + execution_result = ExecutionCycleResult( + plan=plan, + portfolio=portfolio, + execution=execution, + allocation=allocation, + logs=(), + skip_logs=(), + note_logs=(message,), + action_done=False, + ) + else: + execution_result = execute_rebalance_cycle( + trade_context=trade_context, + plan=plan, + portfolio=portfolio, + execution=execution, + allocation=allocation, + fetch_replanned_state=fetch_replanned_state, + market_data_port=market_data_port, + estimate_max_purchase_quantity=runtime.estimate_max_purchase_quantity, + execution_port=execution_port, + post_submit_order=runtime.post_submit_order, + notify_issue=runtime.notify_issue, + translator=config.translator, + with_prefix=config.with_prefix, + limit_sell_discount=config.limit_sell_discount, + limit_buy_premium=config.limit_buy_premium, + dry_run_only=config.dry_run_only, + symbol_suffix=config.symbol_suffix, + post_sell_refresh_attempts=config.post_sell_refresh_attempts, + post_sell_refresh_interval_sec=config.post_sell_refresh_interval_sec, + sleeper=config.sleeper or _noop_sleep, + safe_haven_cash_substitute_threshold_usd=config.safe_haven_cash_substitute_threshold_usd, + ) + if _should_record_execution_marker(result=execution_result, config=config): + _record_execution_marker( + config=config, + marker_key=execution_marker_key, + result=execution_result, + notify_issue=runtime.notify_issue, + ) execution = execution_result.execution execution["signal_snapshot"] = build_signal_snapshot( platform="longbridge", diff --git a/application/runtime_composer.py b/application/runtime_composer.py index 864a963..06b0e90 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -8,6 +8,10 @@ from application.runtime_bootstrap_adapters import build_runtime_bootstrap from application.runtime_dependencies import LongBridgeRebalanceConfig, LongBridgeRebalanceRuntime +from application.execution_state import ( + build_execution_marker_store_from_env, + resolve_execution_dedup_enabled, +) from application.runtime_notification_adapters import build_runtime_notification_adapters from application.runtime_reporting_adapters import build_runtime_reporting_adapters from quant_platform_kit.common.port_adapters import CallableNotificationPort @@ -209,6 +213,15 @@ def build_rebalance_config(self, *, strategy_plugin_signals=()) -> LongBridgeReb sleeper=self.sleeper, extra_notification_lines=(market_scope_line,), strategy_plugin_signals=tuple(strategy_plugin_signals or ()), + execution_dedup_enabled=resolve_execution_dedup_enabled( + env_reader=self.env_reader, + dry_run_only=self.dry_run_only, + ), + execution_state_store=build_execution_marker_store_from_env( + env_reader=self.env_reader, + gcp_project_id=self.project_id, + ), + execution_state_account_scope=self.account_region, ) def load_strategy_plugin_signals(self, raw_mounts): diff --git a/application/runtime_dependencies.py b/application/runtime_dependencies.py index e64c93c..f410b6d 100644 --- a/application/runtime_dependencies.py +++ b/application/runtime_dependencies.py @@ -26,6 +26,9 @@ class LongBridgeRebalanceConfig: sleeper: Callable[[float], None] | None = None extra_notification_lines: tuple[str, ...] = () strategy_plugin_signals: tuple[Any, ...] = () + execution_dedup_enabled: bool = False + execution_state_store: Any = None + execution_state_account_scope: str = "" @dataclass(frozen=True) diff --git a/notifications/renderers.py b/notifications/renderers.py index 8028c27..f1048b3 100644 --- a/notifications/renderers.py +++ b/notifications/renderers.py @@ -193,16 +193,20 @@ def _build_benchmark_lines(execution, *, translator): ] -def _format_dashboard_text(text) -> str: - return "\n".join( - line.rstrip() - for line in str(text or "").splitlines() - if line.strip() - ) +def _format_dashboard_text(text, *, translator=None) -> str: + lines = [] + for raw_line in str(text or "").splitlines(): + line = raw_line.rstrip() + if not line.strip(): + continue + if translator is not None and _translator_uses_zh(translator): + line = _localize_notification_text(line, translator=translator) + lines.append(line) + return "\n".join(lines) -def _append_dashboard_block(lines, *, execution, separator) -> None: - dashboard_text = _format_dashboard_text(execution.get("dashboard_text")) +def _append_dashboard_block(lines, *, execution, separator, translator) -> None: + dashboard_text = _format_dashboard_text(execution.get("dashboard_text"), translator=translator) if dashboard_text: lines.append(separator) lines.extend(dashboard_text.splitlines()) @@ -369,7 +373,7 @@ def render_rebalance_notification( if dry_run_only: detailed_lines.append(translator("dry_run_banner")) _append_extra_notification_lines(detailed_lines, extra_notification_lines) - _append_dashboard_block(detailed_lines, execution=execution, separator=separator) + _append_dashboard_block(detailed_lines, execution=execution, separator=separator, translator=translator) _append_timing_lines(detailed_lines, execution=execution, translator=translator) _append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator) _append_source_input_line(detailed_lines, execution=execution, translator=translator) @@ -386,7 +390,7 @@ def render_rebalance_notification( if dry_run_only: compact_lines.append(translator("dry_run_banner")) _append_extra_notification_lines(compact_lines, extra_notification_lines) - _append_dashboard_block(compact_lines, execution=execution, separator=separator) + _append_dashboard_block(compact_lines, execution=execution, separator=separator, translator=translator) _append_timing_lines(compact_lines, execution=execution, translator=translator) _append_signal_snapshot_line(compact_lines, execution=execution, translator=translator) _append_source_input_line(compact_lines, execution=execution, translator=translator) @@ -419,7 +423,7 @@ def render_heartbeat_notification( if dry_run_only: detailed_lines.append(translator("dry_run_banner")) _append_extra_notification_lines(detailed_lines, extra_notification_lines) - _append_dashboard_block(detailed_lines, execution=execution, separator=separator) + _append_dashboard_block(detailed_lines, execution=execution, separator=separator, translator=translator) _append_timing_lines(detailed_lines, execution=execution, translator=translator) _append_signal_snapshot_line(detailed_lines, execution=execution, translator=translator) _append_source_input_line(detailed_lines, execution=execution, translator=translator) @@ -455,7 +459,7 @@ def render_heartbeat_notification( if dry_run_only: compact_lines.append(translator("dry_run_banner")) _append_extra_notification_lines(compact_lines, extra_notification_lines) - _append_dashboard_block(compact_lines, execution=execution, separator=separator) + _append_dashboard_block(compact_lines, execution=execution, separator=separator, translator=translator) _append_timing_lines(compact_lines, execution=execution, translator=translator) _append_signal_snapshot_line(compact_lines, execution=execution, translator=translator) _append_source_input_line(compact_lines, execution=execution, translator=translator) diff --git a/notifications/telegram.py b/notifications/telegram.py index c1ab9eb..428ed10 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -66,6 +66,8 @@ "buy_deferred_small_account_cash_substitution": "{symbol} 目标金额 ${diff} 低于 1 股价格 ${price};为避免超过目标仓位,小账户本轮保留现金,不回补 {cash_symbols}", "buy_deferred_small_cash": "{symbol} 目标差额 ${diff},但可投资现金 ${investable} 不足买入 1 股(价格 ${price})", "buy_deferred_cash_limit": "{symbol} 目标差额 ${diff},预算可买 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用", + "buy_deferred_cash_sweep_cash_limit": "{symbol} 剩余可投资现金 ${investable},预算可回补 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用", + "execution_already_recorded": "已跳过重复执行:信号日 {signal_date} / 执行日 {effective_date} 已记录,本轮不再生成订单", "cash_sweep_rebuy": "🏦 [尾部回补] 剩余可投资现金回补 {symbol}: {qty}股 @ ${price}", "limit_buy": "📈 [限价买入] {symbol}: {qty}股 @ ${price}", "market_buy": "📈 [市价买入] {symbol}: {qty}股 @ ${price}", @@ -186,6 +188,8 @@ "buy_deferred_small_account_cash_substitution": "{symbol} target ${diff} is below the 1-share price ${price}; to avoid exceeding the target allocation, this small account keeps cash this cycle and does not rebuy {cash_symbols}", "buy_deferred_small_cash": "{symbol} target gap ${diff}, but investable cash ${investable} is not enough for 1 share at ${price}", "buy_deferred_cash_limit": "{symbol} target gap ${diff}, budget supports {budget_qty} shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds", + "buy_deferred_cash_sweep_cash_limit": "{symbol} residual investable cash ${investable}, budget supports {budget_qty} tail-rebuy shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds", + "execution_already_recorded": "Duplicate execution skipped: signal date {signal_date} / effective date {effective_date} is already recorded; no orders will be generated this cycle", "cash_sweep_rebuy": "🏦 [tail rebuy] residual investable cash rebought {symbol}: {qty} shares @ ${price}", "limit_buy": "📈 [Limit buy] {symbol}: {qty} shares @ ${price}", "market_buy": "📈 [Market buy] {symbol}: {qty} shares @ ${price}", diff --git a/tests/test_notifications.py b/tests/test_notifications.py index 8425171..fade450 100644 --- a/tests/test_notifications.py +++ b/tests/test_notifications.py @@ -150,6 +150,12 @@ def test_heartbeat_signal_snapshot_localizes_price_source(self): def test_heartbeat_localizes_strategy_diagnostics_and_source_input_status(self): rendered = render_heartbeat_notification( execution={ + "dashboard_text": ( + "📌 策略账户概览\n" + "🎯 信号: regime=risk_on breadth=68.0% benchmark_trend=up " + "target_stock=100.0% realized_stock=100.0% selected=4 " + "top=MU(4.07), INTC(2.23), AMD(1.96)" + ), "signal_snapshot": { "market_date": "", "latest_price_source": "longbridge_candlesticks", @@ -173,6 +179,12 @@ def test_heartbeat_localizes_strategy_diagnostics_and_source_input_status(self): ) self.assertIn("🧩 输入状态: 价格 2026-06-01 | 股票池 2026-05-14 | 状态 部分行情刷新", rendered.compact_text) + self.assertIn( + "🎯 信号: 市场阶段=进攻 市场宽度=68.0% 基准趋势=向上 " + "目标股票仓位=100.0% 实际股票仓位=100.0% 入选标的数=4 " + "前排标的=MU(4.07), INTC(2.23), AMD(1.96)", + rendered.compact_text, + ) self.assertIn("📊 市场状态: 市场阶段=进攻", rendered.compact_text) self.assertIn( "🎯 信号: 市场阶段=进攻 市场宽度=68.0% 基准趋势=向上 " @@ -181,6 +193,7 @@ def test_heartbeat_localizes_strategy_diagnostics_and_source_input_status(self): rendered.compact_text, ) self.assertNotIn("regime=risk_on", rendered.compact_text) + self.assertNotIn("benchmark_trend=up", rendered.compact_text) self.assertNotIn("target_stock=", rendered.compact_text) self.assertNotIn("partial_history_refresh", rendered.compact_text) diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index 0f2d170..e65f646 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -543,6 +543,231 @@ def test_run_strategy_supports_execution_port_runtime_path(self): self.assertEqual(len(sent_messages), 1) self.assertIn("【调仓", sent_messages[0]) + def test_run_strategy_skips_when_execution_marker_already_exists(self): + sent_messages = [] + checked_keys = [] + plan = _build_plan( + strategy_symbols=("SOXX",), + risk_symbols=("SOXX",), + targets={"SOXX": 500.0}, + market_values={"SOXX": 0.0}, + sellable_quantities={"SOXX": 0}, + quantities={"SOXX": 0}, + current_min_trade=10.0, + trade_threshold_value=10.0, + investable_cash=600.0, + market_status="Risk on", + deploy_ratio_text="100.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="duplicate signal", + available_cash=600.0, + total_strategy_equity=600.0, + portfolio_rows=(("SOXX",),), + signal_date="2026-06-01", + effective_date="2026-06-02", + ) + + class FakeStore: + def has_marker(self, marker_key): + checked_keys.append(marker_key) + return True + + def record_marker(self, *_args, **_kwargs): + raise AssertionError("duplicate run must not record a new marker") + + rebalance_service.run_strategy( + runtime=LongBridgeRebalanceRuntime( + bootstrap=lambda: ("quote-context", "trade-context", {"trend": "ok"}), + resolve_rebalance_plan=lambda *, indicators, snapshot=None, account_state=None: plan, + market_data_port_factory=lambda _quote_context: CallableMarketDataPort( + quote_loader=lambda _symbol: (_ for _ in ()).throw(AssertionError("quote should not load")) + ), + estimate_max_purchase_quantity=lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("buy estimate should not run") + ), + notifications=CallableNotificationPort(sent_messages.append), + notify_issue=lambda title, detail: sent_messages.append(f"{title}\n{detail}"), + portfolio_port_factory=lambda _quote_context, _trade_context: CallablePortfolioPort( + lambda: _build_snapshot(plan) + ), + execution_port_factory=lambda _trade_context: CallableExecutionPort( + lambda _order_intent: (_ for _ in ()).throw(AssertionError("order should not submit")) + ), + ), + config=LongBridgeRebalanceConfig( + limit_sell_discount=0.995, + limit_buy_premium=1.005, + separator="━━━━━━━━━━━━━━━━━━", + translator=build_translator("zh"), + with_prefix=lambda message: f"[PAPER/LongBridgeQuant] {message}", + strategy_profile="soxl_soxx_trend_income", + strategy_display_name="SOXL/SOXX 半导体趋势收益", + dry_run_only=True, + execution_dedup_enabled=True, + execution_state_store=FakeStore(), + execution_state_account_scope="PAPER", + ), + ) + + self.assertEqual(len(checked_keys), 1) + self.assertIn("paper", checked_keys[0]) + self.assertEqual(len(sent_messages), 1) + self.assertIn("已跳过重复执行", sent_messages[0]) + self.assertIn("2026-06-01", sent_messages[0]) + self.assertNotIn("限价买入", sent_messages[0]) + + def test_run_strategy_skips_when_prior_report_matches_execution_signal(self): + sent_messages = [] + report_checks = [] + plan = _build_plan( + strategy_symbols=("SOXX",), + risk_symbols=("SOXX",), + targets={"SOXX": 500.0}, + market_values={"SOXX": 0.0}, + sellable_quantities={"SOXX": 0}, + quantities={"SOXX": 0}, + current_min_trade=10.0, + trade_threshold_value=10.0, + investable_cash=600.0, + market_status="Risk on", + deploy_ratio_text="100.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="duplicate signal", + available_cash=600.0, + total_strategy_equity=600.0, + portfolio_rows=(("SOXX",),), + signal_date="2026-06-01", + effective_date="2026-06-02", + ) + + class FakeStore: + def has_marker(self, _marker_key): + return False + + def has_prior_execution_report(self, **kwargs): + report_checks.append(kwargs) + return True + + def record_marker(self, *_args, **_kwargs): + raise AssertionError("historical report duplicate must not record a new marker") + + rebalance_service.run_strategy( + runtime=LongBridgeRebalanceRuntime( + bootstrap=lambda: ("quote-context", "trade-context", {"trend": "ok"}), + resolve_rebalance_plan=lambda *, indicators, snapshot=None, account_state=None: plan, + market_data_port_factory=lambda _quote_context: CallableMarketDataPort( + quote_loader=lambda _symbol: (_ for _ in ()).throw(AssertionError("quote should not load")) + ), + estimate_max_purchase_quantity=lambda *args, **kwargs: (_ for _ in ()).throw( + AssertionError("buy estimate should not run") + ), + notifications=CallableNotificationPort(sent_messages.append), + notify_issue=lambda title, detail: sent_messages.append(f"{title}\n{detail}"), + portfolio_port_factory=lambda _quote_context, _trade_context: CallablePortfolioPort( + lambda: _build_snapshot(plan) + ), + execution_port_factory=lambda _trade_context: CallableExecutionPort( + lambda _order_intent: (_ for _ in ()).throw(AssertionError("order should not submit")) + ), + ), + config=LongBridgeRebalanceConfig( + limit_sell_discount=0.995, + limit_buy_premium=1.005, + separator="━━━━━━━━━━━━━━━━━━", + translator=build_translator("zh"), + with_prefix=lambda message: f"[PAPER/LongBridgeQuant] {message}", + strategy_profile="soxl_soxx_trend_income", + strategy_display_name="SOXL/SOXX 半导体趋势收益", + dry_run_only=True, + execution_dedup_enabled=True, + execution_state_store=FakeStore(), + execution_state_account_scope="PAPER", + ), + ) + + self.assertEqual(len(report_checks), 1) + self.assertEqual(report_checks[0]["signal_date"], "2026-06-01") + self.assertEqual(report_checks[0]["effective_date"], "2026-06-02") + self.assertEqual(len(sent_messages), 1) + self.assertIn("已跳过重复执行", sent_messages[0]) + self.assertNotIn("限价买入", sent_messages[0]) + + def test_run_strategy_records_execution_marker_after_dry_run_order_preview(self): + sent_messages = [] + recorded_markers = [] + plan = _build_plan( + strategy_symbols=("BOXX",), + safe_haven_symbols=("BOXX",), + targets={"BOXX": 1150.0}, + market_values={"BOXX": 0.0}, + sellable_quantities={"BOXX": 0}, + quantities={"BOXX": 0}, + current_min_trade=10.0, + trade_threshold_value=10.0, + investable_cash=1200.0, + market_status="Cash sweep", + deploy_ratio_text="0.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="cash sweep", + available_cash=1200.0, + total_strategy_equity=1200.0, + portfolio_rows=(("BOXX",),), + signal_date="2026-06-01", + effective_date="2026-06-02", + ) + + class FakeStore: + def has_marker(self, _marker_key): + return False + + def record_marker(self, marker_key, *, metadata=None): + recorded_markers.append((marker_key, dict(metadata or {}))) + + rebalance_service.run_strategy( + runtime=LongBridgeRebalanceRuntime( + bootstrap=lambda: ("quote-context", "trade-context", {"trend": "ok"}), + resolve_rebalance_plan=lambda *, indicators, snapshot=None, account_state=None: plan, + market_data_port_factory=lambda _quote_context: CallableMarketDataPort( + quote_loader=lambda symbol: QuoteSnapshot( + symbol=symbol, + as_of="2026-06-01", + last_price=100.0, + ) + ), + estimate_max_purchase_quantity=lambda *args, **kwargs: 10, + notifications=CallableNotificationPort(sent_messages.append), + notify_issue=lambda title, detail: sent_messages.append(f"{title}\n{detail}"), + portfolio_port_factory=lambda _quote_context, _trade_context: CallablePortfolioPort( + lambda: _build_snapshot(plan) + ), + execution_port_factory=lambda _trade_context: CallableExecutionPort( + lambda _order_intent: (_ for _ in ()).throw(AssertionError("dry-run should not submit")) + ), + ), + config=LongBridgeRebalanceConfig( + limit_sell_discount=0.995, + limit_buy_premium=1.0, + separator="━━━━━━━━━━━━━━━━━━", + translator=build_translator("zh"), + with_prefix=lambda message: f"[PAPER/LongBridgeQuant] {message}", + strategy_profile="soxl_soxx_trend_income", + strategy_display_name="SOXL/SOXX 半导体趋势收益", + dry_run_only=True, + execution_dedup_enabled=True, + execution_state_store=FakeStore(), + execution_state_account_scope="PAPER", + ), + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("🧪 模拟限价买入 BOXX.US", sent_messages[0]) + self.assertEqual(len(recorded_markers), 1) + self.assertIn("2026-06-01", recorded_markers[0][0]) + self.assertTrue(recorded_markers[0][1]["dry_run_only"]) + def test_append_status_lines_localizes_snapshot_guard_text_for_zh(self): lines = [] rebalance_service._append_status_lines( @@ -1054,7 +1279,7 @@ def test_limit_sell_floors_to_whole_shares(self): self.assertIn("限价卖出] BOXX: 1股", sent_messages[0]) self.assertNotIn("限价卖出] BOXX: 1.5股", sent_messages[0]) - def test_market_buy_floors_to_whole_shares(self): + def test_cash_sweep_buy_uses_marketable_limit_and_floors_to_whole_shares(self): plan = _build_plan( strategy_symbols=("BOXX",), safe_haven_symbols=("BOXX",), @@ -1082,8 +1307,8 @@ def test_market_buy_floors_to_whole_shares(self): ) self.assertEqual(len(sent_messages), 1) - self.assertIn("市价买入] BOXX: 10股", sent_messages[0]) - self.assertNotIn("市价买入] BOXX: 10.5股", sent_messages[0]) + self.assertIn("限价买入] BOXX: 10股", sent_messages[0]) + self.assertNotIn("限价买入] BOXX: 10.5股", sent_messages[0]) def test_zero_target_sell_uses_sellable_quantity_not_price_derived_floor(self): plan = _build_plan( @@ -1611,7 +1836,41 @@ def test_dry_run_rebuys_cash_sweep_symbol_with_remaining_investable_cash(self): self.assertEqual(len(sent_messages), 1) self.assertIn("🧪 模拟运行模式", sent_messages[0]) - self.assertIn("🧪 模拟市价买入 BOXX.US", sent_messages[0]) + self.assertIn("🧪 模拟限价买入 BOXX.US", sent_messages[0]) + + def test_cash_sweep_rebuy_skips_when_broker_estimate_is_zero(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=1200.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="0.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="无其他买单,仅保留现金回补", + available_cash=1200.0, + total_strategy_equity=1200.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + + sent_messages, _, _ = self._run_strategy( + initial_plan, + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=0, + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("💓 【心跳检测】", sent_messages[0]) + self.assertIn("BOXX.US 剩余可投资现金 $1200.00", sent_messages[0]) + self.assertIn("券商估算可买数量为 0", sent_messages[0]) + self.assertNotIn("市价买入] BOXX", sent_messages[0]) def test_retries_account_refresh_after_sell_until_buying_power_updates(self): initial_plan = _build_plan( diff --git a/tests/test_runtime_composer.py b/tests/test_runtime_composer.py index 4e279d6..6b1758a 100644 --- a/tests/test_runtime_composer.py +++ b/tests/test_runtime_composer.py @@ -131,3 +131,6 @@ def fake_bootstrap_builder(**kwargs): assert config.strategy_display_name == "SOXL/SOXX 半导体趋势收益" assert config.dry_run_only is True assert config.safe_haven_cash_substitute_threshold_usd == 1000.0 + assert config.execution_dedup_enabled is True + assert config.execution_state_account_scope == "HK" + assert config.execution_state_store.gcs_prefix_uri == "gs://bucket/runtime-reports"