From a90124fb32383c3f8135b2302367037c11ab4489 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Sun, 7 Jun 2026 02:24:58 +0800 Subject: [PATCH] Improve manual review notification cards --- .../market_regime_control_plugin.py | 12 +- .../strategy_plugin_runner.py | 294 +++++++++++++++++- tests/test_strategy_plugin_runner.py | 17 + 3 files changed, 321 insertions(+), 2 deletions(-) diff --git a/src/quant_strategy_plugins/market_regime_control_plugin.py b/src/quant_strategy_plugins/market_regime_control_plugin.py index 5444b5a..2f58553 100644 --- a/src/quant_strategy_plugins/market_regime_control_plugin.py +++ b/src/quant_strategy_plugins/market_regime_control_plugin.py @@ -177,7 +177,17 @@ def _compact_signal(payload: Mapping[str, Any] | None) -> dict[str, Any]: ): if key in payload: compact[key] = payload.get(key) - for key in ("data_freshness", "data_quality", "event_quality", "audit_summary"): + for key in ( + "data_freshness", + "data_quality", + "event_quality", + "panic_reversal_quality", + "audit_summary", + "metrics", + "rebound_confirmation", + "reversal_confirmation", + "selected_event", + ): value = payload.get(key) if isinstance(value, Mapping): compact[key] = dict(value) diff --git a/src/quant_strategy_plugins/strategy_plugin_runner.py b/src/quant_strategy_plugins/strategy_plugin_runner.py index 136bf52..758d183 100644 --- a/src/quant_strategy_plugins/strategy_plugin_runner.py +++ b/src/quant_strategy_plugins/strategy_plugin_runner.py @@ -817,6 +817,290 @@ def _payload_should_notify(payload: Mapping[str, Any], route: str) -> bool: return route != "no_action" +def _as_float_or_none(value: Any) -> float | None: + try: + result = float(value) + except (TypeError, ValueError): + return None + return result if pd.notna(result) else None + + +def _format_number(value: Any, *, digits: int = 2) -> str: + number = _as_float_or_none(value) + return "n/a" if number is None else f"{number:.{digits}f}" + + +def _format_pct(value: Any, *, digits: int = 1, signed: bool = False) -> str: + number = _as_float_or_none(value) + if number is None: + return "n/a" + sign = "+" if signed and number > 0 else "" + return f"{sign}{number * 100:.{digits}f}%" + + +def _component_payload(payload: Mapping[str, Any], component: str) -> Mapping[str, Any]: + components = _nested_mapping(payload, "component_signals") + value = components.get(component) + return value if isinstance(value, Mapping) and value.get("available", True) else {} + + +def _active_panic_payload(payload: Mapping[str, Any], plugin: str) -> Mapping[str, Any]: + if plugin == PLUGIN_PANIC_REVERSAL_SHADOW or "reversal_confirmation" in payload: + return payload + component = _component_payload(payload, "panic_reversal") + if not component: + return {} + if _as_bool(component.get("manual_review_required"), default=False) or _as_bool( + component.get("panic_reversal_context_active"), + default=False, + ): + return component + return {} + + +def _active_taco_payload(payload: Mapping[str, Any], plugin: str) -> Mapping[str, Any]: + if plugin == PLUGIN_TACO_REBOUND_SHADOW or "rebound_confirmation" in payload: + return payload + component = _component_payload(payload, "taco") + if not component: + return {} + if _as_bool(component.get("manual_review_required"), default=False) or _as_bool( + component.get("rebound_context_active"), + default=False, + ): + return component + return {} + + +def _review_attack_symbol(*sources: Mapping[str, Any]) -> str: + for source in sources: + for container_key in ("metrics", "rebound_confirmation"): + container = source.get(container_key) + if isinstance(container, Mapping): + symbol = str(container.get("attack_symbol") or "").strip().upper() + if symbol: + return symbol + return "" + + +def _manual_review_source_title( + *, + panic_payload: Mapping[str, Any], + taco_payload: Mapping[str, Any], + locale: str, +) -> str: + panic_active = bool(panic_payload) + taco_active = bool(taco_payload) + if locale == "zh-CN": + if panic_active and taco_active: + return "事件缓和 + VIX 恐慌反转共振" + if panic_active: + return "VIX 恐慌反转" + if taco_active: + return "TACO 事件反弹" + return "机会观察" + if panic_active and taco_active: + return "event de-escalation + VIX panic reversal" + if panic_active: + return "VIX panic reversal" + if taco_active: + return "TACO event rebound" + return "opportunity watch" + + +def _append_panic_review_lines(lines: list[str], panic_payload: Mapping[str, Any], *, locale: str) -> None: + metrics = _nested_mapping(panic_payload, "metrics") + confirmation = _nested_mapping(panic_payload, "reversal_confirmation") + thresholds = _nested_mapping(confirmation, "thresholds") + benchmark = str(metrics.get("benchmark_symbol") or "benchmark").upper() + attack = str(metrics.get("attack_symbol") or "attack").upper() + if locale == "zh-CN": + lines.extend( + [ + "触发原因:", + ( + "- VIX 曾达到恐慌区间:" + f"{int(_as_float_or_none(thresholds.get('vix_high_lookback_days')) or 5)} 日高点 " + f"{_format_number(metrics.get('vix_lookback_high'))},阈值 " + f"{_format_number(thresholds.get('min_vix_high'))}" + ), + ( + "- VIX 已从高点回落:" + f"当前 {_format_number(metrics.get('vix'))},较高点回落 " + f"{_format_pct(metrics.get('vix_pullback_from_high'))}" + ), + ( + "- VIX 继续下降:" + f"前值 {_format_number(metrics.get('vix_previous'))},当前 {_format_number(metrics.get('vix'))}" + ), + ( + "- VIX/VIX3M = " + f"{_format_number(metrics.get('vix_vix3m_ratio'))},用于确认恐慌结构仍可观测" + ), + ( + f"- {benchmark} 3 日收益 {_format_pct(metrics.get('benchmark_3d_return'), signed=True)}," + f"从近 5 日低点反弹 {_format_pct(metrics.get('benchmark_rebound_from_recent_low'), signed=True)}" + ), + f"- {attack} 从近 5 日低点反弹 {_format_pct(metrics.get('attack_rebound_from_recent_low'), signed=True)}", + ] + ) + return + lines.extend( + [ + "Trigger evidence:", + ( + "- VIX reached panic territory: " + f"{int(_as_float_or_none(thresholds.get('vix_high_lookback_days')) or 5)}-day high " + f"{_format_number(metrics.get('vix_lookback_high'))}; threshold " + f"{_format_number(thresholds.get('min_vix_high'))}" + ), + ( + "- VIX has pulled back from the high: " + f"current {_format_number(metrics.get('vix'))}, pullback " + f"{_format_pct(metrics.get('vix_pullback_from_high'))}" + ), + ( + "- VIX is still falling: " + f"previous {_format_number(metrics.get('vix_previous'))}, current {_format_number(metrics.get('vix'))}" + ), + f"- VIX/VIX3M = {_format_number(metrics.get('vix_vix3m_ratio'))}", + ( + f"- {benchmark} 3-day return {_format_pct(metrics.get('benchmark_3d_return'), signed=True)}; " + f"rebound from recent low {_format_pct(metrics.get('benchmark_rebound_from_recent_low'), signed=True)}" + ), + f"- {attack} rebound from recent low {_format_pct(metrics.get('attack_rebound_from_recent_low'), signed=True)}", + ] + ) + + +def _append_taco_review_lines(lines: list[str], taco_payload: Mapping[str, Any], *, locale: str) -> None: + event = _nested_mapping(taco_payload, "selected_event") + confirmation = _nested_mapping(taco_payload, "rebound_confirmation") + benchmark = str(confirmation.get("benchmark_symbol") or "benchmark").upper() + attack = str(confirmation.get("attack_symbol") or "attack").upper() + if locale == "zh-CN": + lines.extend( + [ + "事件:", + f"- 类型:{event.get('kind') or 'n/a'} / 区域:{event.get('region') or 'n/a'}", + f"- 日期:{event.get('event_date') or 'n/a'}", + f"- 标题:{event.get('title') or 'n/a'}", + f"- 来源:{event.get('source') or 'n/a'}", + "价格确认:", + f"- 事件后已过 {confirmation.get('trading_days_after_event', 'n/a')} 个交易日", + ( + f"- {benchmark} 3 日收益 {_format_pct(confirmation.get('benchmark_3d_return'), signed=True)}," + f"从近 5 日低点反弹 " + f"{_format_pct(confirmation.get('benchmark_rebound_from_recent_low'), signed=True)}" + ), + f"- {attack} 从近 5 日低点反弹 {_format_pct(confirmation.get('attack_rebound_from_recent_low'), signed=True)}", + ] + ) + return + lines.extend( + [ + "Event:", + f"- Type: {event.get('kind') or 'n/a'} / region: {event.get('region') or 'n/a'}", + f"- Date: {event.get('event_date') or 'n/a'}", + f"- Title: {event.get('title') or 'n/a'}", + f"- Source: {event.get('source') or 'n/a'}", + "Price confirmation:", + f"- {confirmation.get('trading_days_after_event', 'n/a')} trading days after the event", + ( + f"- {benchmark} 3-day return {_format_pct(confirmation.get('benchmark_3d_return'), signed=True)}; " + f"rebound from recent low " + f"{_format_pct(confirmation.get('benchmark_rebound_from_recent_low'), signed=True)}" + ), + f"- {attack} rebound from recent low {_format_pct(confirmation.get('attack_rebound_from_recent_low'), signed=True)}", + ] + ) + + +def _format_manual_review_notification_message( + payload: Mapping[str, Any], + *, + locale: str, + target_label: str, + plugin: str, + as_of: str, + route: str, +) -> str | None: + if _payload_action(payload) != "notify_manual_review": + return None + panic_payload = _active_panic_payload(payload, plugin) + taco_payload = _active_taco_payload(payload, plugin) + if not panic_payload and not taco_payload: + return None + + position_control = _nested_mapping(payload, "position_control") + vetoes = _message_reason_codes(position_control.get("vetoes") or _nested_mapping(payload, "arbiter").get("vetoes")) + attack_symbol = _review_attack_symbol(panic_payload, taco_payload) or target_label + source_title = _manual_review_source_title(panic_payload=panic_payload, taco_payload=taco_payload, locale=locale) + if locale == "zh-CN": + lines = [ + f"【机会复核|{attack_symbol}|{source_title}】", + f"日期:{as_of or '未知日期'}", + "结论:触发人工复核,不自动加仓。", + ( + f"仲裁:{plugin} = {route};" + f"{'crisis/macro 未 veto' if not vetoes else '存在 veto:' + _message_join(vetoes, locale)}。" + ), + "执行权限:只通知;不下单;不修改仓位。", + ] + if position_control: + scalar_bits = [] + if "taco_size_scalar" in position_control: + scalar_bits.append(f"taco_size_scalar = {_format_number(position_control.get('taco_size_scalar'))}") + if "panic_reversal_size_scalar" in position_control: + scalar_bits.append( + f"panic_reversal_size_scalar = {_format_number(position_control.get('panic_reversal_size_scalar'))}" + ) + if scalar_bits: + lines.append("仓位权限:" + ";".join(scalar_bits) + "。") + if panic_payload: + _append_panic_review_lines(lines, panic_payload, locale=locale) + if taco_payload: + _append_taco_review_lines(lines, taco_payload, locale=locale) + lines.extend( + [ + "人工复核建议:", + "- 只评估是否停止继续降风险或恢复观察,不作为自动买入信号。", + "- 若 crisis/macro 后续转为 risk_reduced 或 risk_off,本机会信号自动失效。", + ] + ) + return "\n".join(lines) + + lines = [ + f"[Opportunity Review | {attack_symbol} | {source_title}]", + f"Date: {as_of or 'unknown date'}", + "Conclusion: manual review triggered; no automatic position increase.", + f"Arbiter: {plugin} = {route}; {'crisis/macro did not veto' if not vetoes else 'vetoes: ' + _message_join(vetoes, locale)}.", + "Execution: notify only; no broker orders; no allocation mutation.", + ] + if position_control: + scalar_bits = [] + if "taco_size_scalar" in position_control: + scalar_bits.append(f"taco_size_scalar = {_format_number(position_control.get('taco_size_scalar'))}") + if "panic_reversal_size_scalar" in position_control: + scalar_bits.append( + f"panic_reversal_size_scalar = {_format_number(position_control.get('panic_reversal_size_scalar'))}" + ) + if scalar_bits: + lines.append("Position authority: " + "; ".join(scalar_bits) + ".") + if panic_payload: + _append_panic_review_lines(lines, panic_payload, locale=locale) + if taco_payload: + _append_taco_review_lines(lines, taco_payload, locale=locale) + lines.extend( + [ + "Manual review guidance:", + "- Review only whether to stop further de-risking or return to watch; this is not an automatic buy signal.", + "- If crisis/macro later moves to risk_reduced or risk_off, this opportunity signal is invalidated.", + ] + ) + return "\n".join(lines) + + def _format_notification_message( *, locale: str, @@ -898,7 +1182,15 @@ def _build_localized_messages( locale: list(_localized_reason_labels(reason_codes, locale)) for locale in SUPPORTED_MESSAGE_LOCALES } notification_messages = { - locale: _format_notification_message( + locale: _format_manual_review_notification_message( + payload, + locale=locale, + target_label=target_label, + plugin=plugin, + as_of=as_of, + route=route, + ) + or _format_notification_message( locale=locale, target_label=target_label, target_type=target_type, diff --git a/tests/test_strategy_plugin_runner.py b/tests/test_strategy_plugin_runner.py index 6fe47d1..42f3786 100644 --- a/tests/test_strategy_plugin_runner.py +++ b/tests/test_strategy_plugin_runner.py @@ -399,6 +399,13 @@ def test_strategy_plugin_runner_can_enable_panic_reversal_inside_market_regime_c assert payload["position_control"]["panic_reversal_size_scalar"] == 0.0 assert payload["position_control"]["taco_allowed"] is False assert "panic_reversal:panic_reversal" in payload["position_control"]["reason_codes"] + zh_notification = payload["notification"]["localized_messages"]["zh-CN"] + assert "【机会复核|TQQQ|VIX 恐慌反转】" in zh_notification + assert "结论:触发人工复核,不自动加仓。" in zh_notification + assert "VIX 曾达到恐慌区间" in zh_notification + assert "VIX 已从高点回落" in zh_notification + assert "QQQ 3 日收益" in zh_notification + assert "panic_reversal_size_scalar = 0.00" in zh_notification def test_strategy_plugin_runner_runs_general_market_regime_notification(tmp_path) -> None: @@ -704,6 +711,12 @@ def test_strategy_plugin_runner_runs_taco_rebound_notification_mount_for_tqqq(tm assert latest["rebound_confirmation"]["confirmed"] is True assert latest["would_trade_if_enabled"] is False assert "sleeve_suggestion" not in latest + zh_notification = latest["localized_messages"]["notification"]["zh-CN"] + assert "【机会复核|TQQQ|TACO 事件反弹】" in zh_notification + assert "结论:触发人工复核,不自动加仓。" in zh_notification + assert "事件:" in zh_notification + assert "价格确认:" in zh_notification + assert "人工复核建议:" in zh_notification def test_strategy_plugin_runner_can_enable_taco_ai_audit_without_api_key(tmp_path, monkeypatch) -> None: @@ -817,6 +830,10 @@ def test_strategy_plugin_runner_runs_panic_reversal_notification_mount_for_tqqq( assert latest["execution_controls"]["position_control_allowed"] is False assert latest["execution_controls"]["consumption_evidence_status"] == EVIDENCE_NOTIFICATION_ONLY assert latest["localized_messages"]["labels"]["canonical_route"]["zh-CN"] == "恐慌反转" + zh_notification = latest["notification"]["localized_messages"]["zh-CN"] + assert "【机会复核|TQQQ|VIX 恐慌反转】" in zh_notification + assert "执行权限:只通知;不下单;不修改仓位。" in zh_notification + assert "TQQQ 从近 5 日低点反弹" in zh_notification def test_strategy_plugin_runner_rejects_panic_reversal_for_soxl_strategy_mount(tmp_path) -> None: