Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/quant_strategy_plugins/market_regime_control_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,17 @@ def _compact_signal(payload: Mapping[str, Any] | None) -> dict[str, Any]:
):
if key in payload:
compact[key] = payload.get(key)
for key in ("data_freshness", "data_quality", "event_quality", "audit_summary"):
for key in (
"data_freshness",
"data_quality",
"event_quality",
"panic_reversal_quality",
"audit_summary",
"metrics",
"rebound_confirmation",
"reversal_confirmation",
"selected_event",
):
value = payload.get(key)
if isinstance(value, Mapping):
compact[key] = dict(value)
Expand Down
294 changes: 293 additions & 1 deletion src/quant_strategy_plugins/strategy_plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,290 @@ def _payload_should_notify(payload: Mapping[str, Any], route: str) -> bool:
return route != "no_action"


def _as_float_or_none(value: Any) -> float | None:
try:
result = float(value)
except (TypeError, ValueError):
return None
return result if pd.notna(result) else None


def _format_number(value: Any, *, digits: int = 2) -> str:
number = _as_float_or_none(value)
return "n/a" if number is None else f"{number:.{digits}f}"


def _format_pct(value: Any, *, digits: int = 1, signed: bool = False) -> str:
number = _as_float_or_none(value)
if number is None:
return "n/a"
sign = "+" if signed and number > 0 else ""
return f"{sign}{number * 100:.{digits}f}%"


def _component_payload(payload: Mapping[str, Any], component: str) -> Mapping[str, Any]:
components = _nested_mapping(payload, "component_signals")
value = components.get(component)
return value if isinstance(value, Mapping) and value.get("available", True) else {}


def _active_panic_payload(payload: Mapping[str, Any], plugin: str) -> Mapping[str, Any]:
if plugin == PLUGIN_PANIC_REVERSAL_SHADOW or "reversal_confirmation" in payload:
return payload
component = _component_payload(payload, "panic_reversal")
if not component:
return {}
if _as_bool(component.get("manual_review_required"), default=False) or _as_bool(
component.get("panic_reversal_context_active"),
default=False,
):
return component
return {}


def _active_taco_payload(payload: Mapping[str, Any], plugin: str) -> Mapping[str, Any]:
if plugin == PLUGIN_TACO_REBOUND_SHADOW or "rebound_confirmation" in payload:
return payload
component = _component_payload(payload, "taco")
if not component:
return {}
if _as_bool(component.get("manual_review_required"), default=False) or _as_bool(
component.get("rebound_context_active"),
default=False,
):
return component
return {}


def _review_attack_symbol(*sources: Mapping[str, Any]) -> str:
for source in sources:
for container_key in ("metrics", "rebound_confirmation"):
container = source.get(container_key)
if isinstance(container, Mapping):
symbol = str(container.get("attack_symbol") or "").strip().upper()
if symbol:
return symbol
return ""


def _manual_review_source_title(
*,
panic_payload: Mapping[str, Any],
taco_payload: Mapping[str, Any],
locale: str,
) -> str:
panic_active = bool(panic_payload)
taco_active = bool(taco_payload)
if locale == "zh-CN":
if panic_active and taco_active:
return "事件缓和 + VIX 恐慌反转共振"
if panic_active:
return "VIX 恐慌反转"
if taco_active:
return "TACO 事件反弹"
return "机会观察"
if panic_active and taco_active:
return "event de-escalation + VIX panic reversal"
if panic_active:
return "VIX panic reversal"
if taco_active:
return "TACO event rebound"
return "opportunity watch"


def _append_panic_review_lines(lines: list[str], panic_payload: Mapping[str, Any], *, locale: str) -> None:
metrics = _nested_mapping(panic_payload, "metrics")
confirmation = _nested_mapping(panic_payload, "reversal_confirmation")
thresholds = _nested_mapping(confirmation, "thresholds")
benchmark = str(metrics.get("benchmark_symbol") or "benchmark").upper()
attack = str(metrics.get("attack_symbol") or "attack").upper()
if locale == "zh-CN":
lines.extend(
[
"触发原因:",
(
"- VIX 曾达到恐慌区间:"
f"{int(_as_float_or_none(thresholds.get('vix_high_lookback_days')) or 5)} 日高点 "
f"{_format_number(metrics.get('vix_lookback_high'))},阈值 "
f"{_format_number(thresholds.get('min_vix_high'))}"
),
(
"- VIX 已从高点回落:"
f"当前 {_format_number(metrics.get('vix'))},较高点回落 "
f"{_format_pct(metrics.get('vix_pullback_from_high'))}"
),
(
"- VIX 继续下降:"
f"前值 {_format_number(metrics.get('vix_previous'))},当前 {_format_number(metrics.get('vix'))}"
),
(
"- VIX/VIX3M = "
f"{_format_number(metrics.get('vix_vix3m_ratio'))},用于确认恐慌结构仍可观测"
),
(
f"- {benchmark} 3 日收益 {_format_pct(metrics.get('benchmark_3d_return'), signed=True)},"
f"从近 5 日低点反弹 {_format_pct(metrics.get('benchmark_rebound_from_recent_low'), signed=True)}"
),
f"- {attack} 从近 5 日低点反弹 {_format_pct(metrics.get('attack_rebound_from_recent_low'), signed=True)}",
]
)
return
lines.extend(
[
"Trigger evidence:",
(
"- VIX reached panic territory: "
f"{int(_as_float_or_none(thresholds.get('vix_high_lookback_days')) or 5)}-day high "
f"{_format_number(metrics.get('vix_lookback_high'))}; threshold "
f"{_format_number(thresholds.get('min_vix_high'))}"
),
(
"- VIX has pulled back from the high: "
f"current {_format_number(metrics.get('vix'))}, pullback "
f"{_format_pct(metrics.get('vix_pullback_from_high'))}"
),
(
"- VIX is still falling: "
f"previous {_format_number(metrics.get('vix_previous'))}, current {_format_number(metrics.get('vix'))}"
),
f"- VIX/VIX3M = {_format_number(metrics.get('vix_vix3m_ratio'))}",
(
f"- {benchmark} 3-day return {_format_pct(metrics.get('benchmark_3d_return'), signed=True)}; "
f"rebound from recent low {_format_pct(metrics.get('benchmark_rebound_from_recent_low'), signed=True)}"
),
f"- {attack} rebound from recent low {_format_pct(metrics.get('attack_rebound_from_recent_low'), signed=True)}",
]
)


def _append_taco_review_lines(lines: list[str], taco_payload: Mapping[str, Any], *, locale: str) -> None:
event = _nested_mapping(taco_payload, "selected_event")
confirmation = _nested_mapping(taco_payload, "rebound_confirmation")
benchmark = str(confirmation.get("benchmark_symbol") or "benchmark").upper()
attack = str(confirmation.get("attack_symbol") or "attack").upper()
if locale == "zh-CN":
lines.extend(
[
"事件:",
f"- 类型:{event.get('kind') or 'n/a'} / 区域:{event.get('region') or 'n/a'}",
f"- 日期:{event.get('event_date') or 'n/a'}",
f"- 标题:{event.get('title') or 'n/a'}",
f"- 来源:{event.get('source') or 'n/a'}",
"价格确认:",
f"- 事件后已过 {confirmation.get('trading_days_after_event', 'n/a')} 个交易日",
(
f"- {benchmark} 3 日收益 {_format_pct(confirmation.get('benchmark_3d_return'), signed=True)},"
f"从近 5 日低点反弹 "
f"{_format_pct(confirmation.get('benchmark_rebound_from_recent_low'), signed=True)}"
),
f"- {attack} 从近 5 日低点反弹 {_format_pct(confirmation.get('attack_rebound_from_recent_low'), signed=True)}",
]
)
return
lines.extend(
[
"Event:",
f"- Type: {event.get('kind') or 'n/a'} / region: {event.get('region') or 'n/a'}",
f"- Date: {event.get('event_date') or 'n/a'}",
f"- Title: {event.get('title') or 'n/a'}",
f"- Source: {event.get('source') or 'n/a'}",
"Price confirmation:",
f"- {confirmation.get('trading_days_after_event', 'n/a')} trading days after the event",
(
f"- {benchmark} 3-day return {_format_pct(confirmation.get('benchmark_3d_return'), signed=True)}; "
f"rebound from recent low "
f"{_format_pct(confirmation.get('benchmark_rebound_from_recent_low'), signed=True)}"
),
f"- {attack} rebound from recent low {_format_pct(confirmation.get('attack_rebound_from_recent_low'), signed=True)}",
]
)


def _format_manual_review_notification_message(
payload: Mapping[str, Any],
*,
locale: str,
target_label: str,
plugin: str,
as_of: str,
route: str,
) -> str | None:
if _payload_action(payload) != "notify_manual_review":
return None
panic_payload = _active_panic_payload(payload, plugin)
taco_payload = _active_taco_payload(payload, plugin)
if not panic_payload and not taco_payload:
return None

position_control = _nested_mapping(payload, "position_control")
vetoes = _message_reason_codes(position_control.get("vetoes") or _nested_mapping(payload, "arbiter").get("vetoes"))
attack_symbol = _review_attack_symbol(panic_payload, taco_payload) or target_label
source_title = _manual_review_source_title(panic_payload=panic_payload, taco_payload=taco_payload, locale=locale)
if locale == "zh-CN":
lines = [
f"【机会复核|{attack_symbol}|{source_title}】",
f"日期:{as_of or '未知日期'}",
"结论:触发人工复核,不自动加仓。",
(
f"仲裁:{plugin} = {route};"
f"{'crisis/macro 未 veto' if not vetoes else '存在 veto:' + _message_join(vetoes, locale)}。"
Comment on lines +1045 to +1046
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid claiming veto checks ran for standalone plugins

When this formatter is used for standalone taco_rebound_shadow or panic_reversal_shadow runs, there is no market-regime arbiter or position_control.vetoes, so an empty vetoes tuple means “not checked”, not “crisis/macro did not veto”. The new manual-review card therefore gives a false risk-control assurance for direct notification mounts; only market_regime_control payloads can make this claim.

Useful? React with 👍 / 👎.

),
"执行权限:只通知;不下单;不修改仓位。",
]
if position_control:
scalar_bits = []
if "taco_size_scalar" in position_control:
scalar_bits.append(f"taco_size_scalar = {_format_number(position_control.get('taco_size_scalar'))}")
if "panic_reversal_size_scalar" in position_control:
scalar_bits.append(
f"panic_reversal_size_scalar = {_format_number(position_control.get('panic_reversal_size_scalar'))}"
)
if scalar_bits:
lines.append("仓位权限:" + ";".join(scalar_bits) + "。")
if panic_payload:
_append_panic_review_lines(lines, panic_payload, locale=locale)
if taco_payload:
_append_taco_review_lines(lines, taco_payload, locale=locale)
lines.extend(
[
"人工复核建议:",
"- 只评估是否停止继续降风险或恢复观察,不作为自动买入信号。",
"- 若 crisis/macro 后续转为 risk_reduced 或 risk_off,本机会信号自动失效。",
]
)
return "\n".join(lines)

lines = [
f"[Opportunity Review | {attack_symbol} | {source_title}]",
f"Date: {as_of or 'unknown date'}",
"Conclusion: manual review triggered; no automatic position increase.",
f"Arbiter: {plugin} = {route}; {'crisis/macro did not veto' if not vetoes else 'vetoes: ' + _message_join(vetoes, locale)}.",
"Execution: notify only; no broker orders; no allocation mutation.",
]
if position_control:
scalar_bits = []
if "taco_size_scalar" in position_control:
scalar_bits.append(f"taco_size_scalar = {_format_number(position_control.get('taco_size_scalar'))}")
if "panic_reversal_size_scalar" in position_control:
scalar_bits.append(
f"panic_reversal_size_scalar = {_format_number(position_control.get('panic_reversal_size_scalar'))}"
)
if scalar_bits:
lines.append("Position authority: " + "; ".join(scalar_bits) + ".")
if panic_payload:
_append_panic_review_lines(lines, panic_payload, locale=locale)
if taco_payload:
_append_taco_review_lines(lines, taco_payload, locale=locale)
lines.extend(
[
"Manual review guidance:",
"- Review only whether to stop further de-risking or return to watch; this is not an automatic buy signal.",
"- If crisis/macro later moves to risk_reduced or risk_off, this opportunity signal is invalidated.",
]
)
return "\n".join(lines)


def _format_notification_message(
*,
locale: str,
Expand Down Expand Up @@ -898,7 +1182,15 @@ def _build_localized_messages(
locale: list(_localized_reason_labels(reason_codes, locale)) for locale in SUPPORTED_MESSAGE_LOCALES
}
notification_messages = {
locale: _format_notification_message(
locale: _format_manual_review_notification_message(
payload,
locale=locale,
target_label=target_label,
plugin=plugin,
as_of=as_of,
route=route,
)
or _format_notification_message(
locale=locale,
target_label=target_label,
target_type=target_type,
Expand Down
17 changes: 17 additions & 0 deletions tests/test_strategy_plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ def test_strategy_plugin_runner_can_enable_panic_reversal_inside_market_regime_c
assert payload["position_control"]["panic_reversal_size_scalar"] == 0.0
assert payload["position_control"]["taco_allowed"] is False
assert "panic_reversal:panic_reversal" in payload["position_control"]["reason_codes"]
zh_notification = payload["notification"]["localized_messages"]["zh-CN"]
assert "【机会复核|TQQQ|VIX 恐慌反转】" in zh_notification
assert "结论:触发人工复核,不自动加仓。" in zh_notification
assert "VIX 曾达到恐慌区间" in zh_notification
assert "VIX 已从高点回落" in zh_notification
assert "QQQ 3 日收益" in zh_notification
assert "panic_reversal_size_scalar = 0.00" in zh_notification


def test_strategy_plugin_runner_runs_general_market_regime_notification(tmp_path) -> None:
Expand Down Expand Up @@ -704,6 +711,12 @@ def test_strategy_plugin_runner_runs_taco_rebound_notification_mount_for_tqqq(tm
assert latest["rebound_confirmation"]["confirmed"] is True
assert latest["would_trade_if_enabled"] is False
assert "sleeve_suggestion" not in latest
zh_notification = latest["localized_messages"]["notification"]["zh-CN"]
assert "【机会复核|TQQQ|TACO 事件反弹】" in zh_notification
assert "结论:触发人工复核,不自动加仓。" in zh_notification
assert "事件:" in zh_notification
assert "价格确认:" in zh_notification
assert "人工复核建议:" in zh_notification


def test_strategy_plugin_runner_can_enable_taco_ai_audit_without_api_key(tmp_path, monkeypatch) -> None:
Expand Down Expand Up @@ -817,6 +830,10 @@ def test_strategy_plugin_runner_runs_panic_reversal_notification_mount_for_tqqq(
assert latest["execution_controls"]["position_control_allowed"] is False
assert latest["execution_controls"]["consumption_evidence_status"] == EVIDENCE_NOTIFICATION_ONLY
assert latest["localized_messages"]["labels"]["canonical_route"]["zh-CN"] == "恐慌反转"
zh_notification = latest["notification"]["localized_messages"]["zh-CN"]
assert "【机会复核|TQQQ|VIX 恐慌反转】" in zh_notification
assert "执行权限:只通知;不下单;不修改仓位。" in zh_notification
assert "TQQQ 从近 5 日低点反弹" in zh_notification


def test_strategy_plugin_runner_rejects_panic_reversal_for_soxl_strategy_mount(tmp_path) -> None:
Expand Down