diff --git a/README.md b/README.md index 56f850a..3919d5b 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,9 @@ send notifications; plugin research and signal generation live here. - `market_regime_control`: unified deterministic facade for crisis, macro, and TACO signals. Only strategies with positive backtest evidence should mount position controls for automated consumption; SOXL/SOXX currently receives - broad macro/crisis signals as general notifications only. Stock/ETF rotation - strategies should consume the same artifact through their local risk-scaling - policy and keep TACO as notification-only. See the + broad macro/crisis signals through `notification_targets` only. Stock/ETF + rotation strategies should consume the same artifact through their local + risk-scaling policy and keep TACO as notification-only. See the [Market Regime Control design plan](docs/market-regime-control-plan.md). - `taco_rebound_shadow`: TQQQ-only event-rebound context notifier. It writes manual-review artifacts and never recommends position size or changes @@ -136,6 +136,8 @@ as `schema_version`, `canonical_route`, `suggested_action`, `reason_codes`, and logs only. `market_regime_control.notification` mirrors the localized notification text and reason labels so existing notification code can render a message without translating route/action codes itself. +General notification targets are configured under `notification_targets`, not as +synthetic strategies, and never receive position-control permission. ## Local Checks diff --git a/README.zh-CN.md b/README.zh-CN.md index b4e3a79..1b1ec24 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -26,7 +26,7 @@ Brokers、Schwab、LongBridge、Firstrade 等平台仓库只负责加载 artifac - `crisis_response_shadow`:面向杠杆美股策略的黑天鹅防守观察插件。它只写入 shadow-mode artifact,不调用券商接口。 可选启用 AI shadow audit:AI 只审计证据一致性和数据缺口,不改写确定性路线、不下单、不改仓位;默认优先尝试本机 Codex,失败后可走 OpenAI-compatible 或 Anthropic fallback endpoint。 - `macro_risk_governor`:面向 TQQQ 的确定性宏观降杠杆插件。它按价格趋势、实现波动、VIX 和信用 ETF 相对压力打分,输出 `leverage_scalar` / `risk_asset_scalar` 给显式 opt-in 的策略运行时消费。HY OAS、金融压力指数、五角大楼比萨指数、Fear & Greed、put/call、VVIX、SKEW、MOVE、收益率曲线、美元压力、safe-haven demand 等外部硬数据、OSINT、情绪或跨资产字段默认只作为 watch-only 证据,不进入可执行分数;只有显式研究开关开启后才允许外部压力字段参与自动分数。 -- `market_regime_control`:统一确定性 facade,汇总 crisis、macro 和 TACO 信号,输出版本化的 `notification` 和 `position_control`。只有经过回测证明自动消费有效的策略才挂载仓位控制;SOXL/SOXX 这类未通过统一宏观插件复核的高波动行业杠杆策略只接收通用通知,人工决定是否干预。股票/ETF 轮动策略通过本地风险缩放策略消费;TACO 在统一插件里保持通知-only,并会被危机和宏观降风险路线 veto。设计说明见 [Market Regime Control 统一插件方案](docs/market-regime-control-plan.zh-CN.md)。 +- `market_regime_control`:统一确定性 facade,汇总 crisis、macro 和 TACO 信号,输出版本化的 `notification` 和 `position_control`。只有经过回测证明自动消费有效的策略才挂载仓位控制;SOXL/SOXX 这类未通过统一宏观插件复核的高波动行业杠杆策略只通过 `notification_targets` 接收通用通知,人工决定是否干预。股票/ETF 轮动策略通过本地风险缩放策略消费;TACO 在统一插件里保持通知-only,并会被危机和宏观降风险路线 veto。设计说明见 [Market Regime Control 统一插件方案](docs/market-regime-control-plan.zh-CN.md)。 - `taco_rebound_shadow`:仅适用于 TQQQ 的事件反弹上下文通知插件。它只写入人工复核 artifact,不给仓位大小建议,也不改动配置或账户分配。缓和/降温事件会先保持 watch-only,只有事件后价格反弹确认通过后才触发人工复核通知,以减少过早抄底提醒。 该插件也可选启用同样的 shadow-only AI audit,但 AI 只复核事件来源和反弹证据质量。 - TACO panic-rebound 研究、组合回测和 overlay 对比也归属本仓库;snapshot pipeline 仓库只保留兼容入口。 @@ -103,6 +103,8 @@ qsp-build-taco-rebound-shadow-signal \ 策略和券商运行时的交易逻辑仍应只读取 `schema_version`、`canonical_route`、 `suggested_action`、`reason_codes` 和 `position_control` 等机器字段。中英文文案只用于通知界面和日志展示,不参与策略判断。`market_regime_control.notification` 会同步包含本地化通知文案和原因标签,方便现有通知代码直接渲染,不需要在策略仓库里重复翻译 route/action code。 +通用通知目标通过 `notification_targets` 配置,不再伪装成 synthetic strategy, +并且永远不会获得仓位控制权限。 ## 本地检查 diff --git a/docs/examples/strategy_plugins.example.toml b/docs/examples/strategy_plugins.example.toml index 31e805b..804ce41 100644 --- a/docs/examples/strategy_plugins.example.toml +++ b/docs/examples/strategy_plugins.example.toml @@ -41,15 +41,15 @@ taco_enabled = true [strategy_plugins.outputs] output_dir = "data/output/tqqq_growth_income/plugins/market_regime_control" -[[strategy_plugins]] -strategy = "market_regime_notification" +[[notification_targets]] +notification_target = "market_regime_notification" plugin = "market_regime_control" enabled = true # General market-regime notification. This artifact is not mounted into the # SOXL/SOXX strategy runtime; sector-levered SOXL keeps its own validated SOXX # volatility gate and humans decide whether broad macro/crisis notices matter. -[strategy_plugins.inputs] +[notification_targets.inputs] prices = "data/output/market_regime_control/input/soxl_price_history.csv" external_context = "data/output/market_regime_control/input/external_context.csv" event_set = "full" @@ -69,7 +69,7 @@ taco_enabled = false crisis_enabled = true macro_enabled = true -[strategy_plugins.outputs] +[notification_targets.outputs] output_dir = "data/output/market_regime_notification/plugins/market_regime_control" # Deprecated compatibility mounts. They remain runnable for historical diff --git a/docs/market-regime-control-plan.md b/docs/market-regime-control-plan.md index a312592..c3e3ccc 100644 --- a/docs/market-regime-control-plan.md +++ b/docs/market-regime-control-plan.md @@ -131,8 +131,10 @@ allowlist: SOXL/SOXX is not in the strategy-level `market_regime_control` consumption registry. It receives broad market-regime context through the general -`market_regime_notification` artifact to prevent accidental promotion from -notification into automated de-risking. +`notification_targets.market_regime_notification` artifact. That notification +target is not a strategy, cannot enter strategy runtime metadata, and cannot +affect position sizing; this prevents accidental promotion from notification +into automated de-risking. ## Indicator Tiers diff --git a/docs/market-regime-control-plan.zh-CN.md b/docs/market-regime-control-plan.zh-CN.md index f95a1db..8880e32 100644 --- a/docs/market-regime-control-plan.zh-CN.md +++ b/docs/market-regime-control-plan.zh-CN.md @@ -82,7 +82,10 @@ - `evidence_status`:记录该策略/插件组合是 `automation_approved`、`notification_only` 还是 `deprecated_compatibility`。 - `since_version`:记录该消费权限从哪个 runner schema 开始生效。 -SOXL/SOXX 不出现在 `market_regime_control` 的策略级消费 registry 中;它通过 `market_regime_notification` 接收通用通知,避免配置误用把通知信号升级成自动调仓。 +SOXL/SOXX 不出现在 `market_regime_control` 的策略级消费 registry 中;它通过 +`notification_targets.market_regime_notification` 接收通用通知。通用通知不是 +strategy,不允许进入策略 runtime metadata,也不能影响仓位,避免配置误用把通知 +信号升级成自动调仓。 当前观察指标分层: diff --git a/src/quant_strategy_plugins/strategy_plugin_runner.py b/src/quant_strategy_plugins/strategy_plugin_runner.py index 2913680..52796f4 100644 --- a/src/quant_strategy_plugins/strategy_plugin_runner.py +++ b/src/quant_strategy_plugins/strategy_plugin_runner.py @@ -33,7 +33,7 @@ ) DEFAULT_RUNNER_OUTPUT_DIR = "data/output/strategy_plugins" -GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY = "market_regime_notification" +GENERAL_MARKET_REGIME_NOTIFICATION_TARGET = "market_regime_notification" PLUGIN_CRISIS_RESPONSE_SHADOW = "crisis_response_shadow" PLUGIN_MARKET_REGIME_CONTROL = MARKET_REGIME_CONTROL_PROFILE PLUGIN_MACRO_RISK_GOVERNOR = MACRO_RISK_GOVERNOR_PROFILE @@ -71,6 +71,8 @@ class PluginRunResult: output_dir: str | None = None latest_signal_path: str | None = None message: str = "" + target_type: str = "strategy" + notification_target: str | None = None @dataclass(frozen=True) @@ -85,17 +87,19 @@ class PluginConsumptionPolicy: intended_strategy_role: str | None = None +@dataclass(frozen=True) +class PluginNotificationTargetPolicy: + plugin: str + notification_target: str + notification_allowed: bool + position_control_allowed: bool + evidence_status: str + since_version: str + description: str + notification_role: str + + PLUGIN_CONSUMPTION_POLICIES: tuple[PluginConsumptionPolicy, ...] = ( - PluginConsumptionPolicy( - plugin=PLUGIN_MARKET_REGIME_CONTROL, - strategy=GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY, - notification_allowed=True, - position_control_allowed=False, - evidence_status=EVIDENCE_NOTIFICATION_ONLY, - since_version="strategy_plugins.v1", - description="General market-regime notice. Not mounted into an automated strategy runtime.", - intended_strategy_role="general_market_regime_notification", - ), PluginConsumptionPolicy( plugin=PLUGIN_MARKET_REGIME_CONTROL, strategy="tqqq_growth_income", @@ -172,6 +176,21 @@ class PluginConsumptionPolicy: PLUGIN_CONSUMPTION_POLICY_REGISTRY: dict[tuple[str, str], PluginConsumptionPolicy] = { (policy.plugin, policy.strategy): policy for policy in PLUGIN_CONSUMPTION_POLICIES } +PLUGIN_NOTIFICATION_TARGET_POLICIES: tuple[PluginNotificationTargetPolicy, ...] = ( + PluginNotificationTargetPolicy( + plugin=PLUGIN_MARKET_REGIME_CONTROL, + notification_target=GENERAL_MARKET_REGIME_NOTIFICATION_TARGET, + notification_allowed=True, + position_control_allowed=False, + evidence_status=EVIDENCE_NOTIFICATION_ONLY, + since_version="strategy_plugins.v1", + description="General market-regime notice. Not mounted into an automated strategy runtime.", + notification_role="general_market_regime_notification", + ), +) +PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY: dict[tuple[str, str], PluginNotificationTargetPolicy] = { + (policy.plugin, policy.notification_target): policy for policy in PLUGIN_NOTIFICATION_TARGET_POLICIES +} PLUGIN_COMPATIBLE_STRATEGIES: dict[str, tuple[str, ...]] = { plugin: tuple( policy.strategy @@ -180,6 +199,14 @@ class PluginConsumptionPolicy: ) for plugin in sorted({policy.plugin for policy in PLUGIN_CONSUMPTION_POLICIES}) } +PLUGIN_COMPATIBLE_NOTIFICATION_TARGETS: dict[str, tuple[str, ...]] = { + plugin: tuple( + policy.notification_target + for policy in PLUGIN_NOTIFICATION_TARGET_POLICIES + if policy.plugin == plugin and policy.notification_allowed + ) + for plugin in sorted({policy.plugin for policy in PLUGIN_NOTIFICATION_TARGET_POLICIES}) +} LOCALIZED_ROUTE_LABELS: dict[str, dict[str, str]] = { "blocked": {"en-US": "Blocked", "zh-CN": "已阻断"}, @@ -365,10 +392,28 @@ def _validate_plugin_strategy(plugin_name: str, strategy: str) -> None: ) +def _validate_plugin_notification_target(plugin_name: str, notification_target: str) -> None: + policy = PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY.get((plugin_name, notification_target)) + if policy is None or not policy.notification_allowed: + compatible = PLUGIN_COMPATIBLE_NOTIFICATION_TARGETS.get(plugin_name, ()) + choices = ", ".join(compatible) if compatible else "(none)" + raise ValueError( + f"{plugin_name} is notification-target-limited and can only publish to: {choices}; " + f"got notification_target={notification_target!r}" + ) + + def _plugin_consumption_policy(plugin_name: str, strategy: str) -> PluginConsumptionPolicy | None: return PLUGIN_CONSUMPTION_POLICY_REGISTRY.get((plugin_name, strategy)) +def _plugin_notification_target_policy( + plugin_name: str, + notification_target: str, +) -> PluginNotificationTargetPolicy | None: + return PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY.get((plugin_name, notification_target)) + + def _flatten_strategy_plugin_entry(entry: Mapping[str, Any]) -> dict[str, Any]: plugin_config = { key: value @@ -702,7 +747,8 @@ def _payload_should_notify(payload: Mapping[str, Any], route: str) -> bool: def _format_notification_message( *, locale: str, - strategy: str, + target_label: str, + target_type: str, plugin: str, as_of: str, route_label: str, @@ -713,13 +759,15 @@ def _format_notification_message( reason_text = _message_join(reason_labels, locale) if locale == "zh-CN": prefix = "需要通知" if should_notify else "无需通知" + scope = "通知目标" if target_type == "notification_target" else "策略" return ( - f"{prefix}:策略 {strategy} 的 {plugin} 在 {as_of or '未知日期'} 输出" + f"{prefix}:{scope} {target_label} 的 {plugin} 在 {as_of or '未知日期'} 输出" f"市场状态 {route_label},建议动作 {action_label},原因:{reason_text}。" ) prefix = "Notification required" if should_notify else "No notification required" + scope = "notification target" if target_type == "notification_target" else "strategy" return ( - f"{prefix}: {plugin} for strategy {strategy} produced market regime {route_label} " + f"{prefix}: {plugin} for {scope} {target_label} produced market regime {route_label} " f"on {as_of or 'unknown date'} with suggested action {action_label}. Reasons: {reason_text}." ) @@ -727,7 +775,8 @@ def _format_notification_message( def _format_log_message( *, locale: str, - strategy: str, + target_label: str, + target_type: str, plugin: str, as_of: str, route: str, @@ -740,12 +789,13 @@ def _format_log_message( code_text = _message_join(reason_codes, "en-US") label_text = _message_join(reason_labels, locale) if locale == "zh-CN": + target_key = "通知目标" if target_type == "notification_target" else "策略" return ( - f"策略={strategy} 插件={plugin} 日期={as_of or '未知'} 路线={route}({route_label}) " + f"{target_key}={target_label} 插件={plugin} 日期={as_of or '未知'} 路线={route}({route_label}) " f"动作={action}({action_label}) 原因码={code_text} 原因={label_text}" ) return ( - f"strategy={strategy} plugin={plugin} as_of={as_of or 'unknown'} route={route}({route_label}) " + f"target_type={target_type} target={target_label} plugin={plugin} as_of={as_of or 'unknown'} route={route}({route_label}) " f"action={action}({action_label}) reason_codes={code_text} reasons={label_text}" ) @@ -753,7 +803,8 @@ def _format_log_message( def _build_localized_messages( payload: Mapping[str, Any], *, - strategy: str, + strategy: str | None = None, + notification_target: str | None = None, plugin: str, ) -> dict[str, Any]: route = _payload_route(payload) or "unknown" @@ -761,6 +812,8 @@ def _build_localized_messages( reason_codes = _payload_reason_codes(payload) as_of = str(payload.get("as_of") or "").strip() should_notify = _payload_should_notify(payload, route) + target_type = "notification_target" if notification_target else "strategy" + target_label = str(notification_target or strategy or "").strip() route_labels = { locale: _localized_label(LOCALIZED_ROUTE_LABELS, route, locale) for locale in SUPPORTED_MESSAGE_LOCALES @@ -774,7 +827,8 @@ def _build_localized_messages( notification_messages = { locale: _format_notification_message( locale=locale, - strategy=strategy, + target_label=target_label, + target_type=target_type, plugin=plugin, as_of=as_of, route_label=route_labels[locale], @@ -787,7 +841,8 @@ def _build_localized_messages( log_messages = { locale: _format_log_message( locale=locale, - strategy=strategy, + target_label=target_label, + target_type=target_type, plugin=plugin, as_of=as_of, route=route, @@ -816,7 +871,8 @@ def _build_localized_messages( def _build_log_record( payload: Mapping[str, Any], *, - strategy: str, + strategy: str | None = None, + notification_target: str | None = None, plugin: str, mode: str, localized_messages: Mapping[str, Any], @@ -827,7 +883,9 @@ def _build_log_record( "schema_version": STRATEGY_PLUGIN_LOG_SCHEMA_VERSION, "event": "strategy_plugin_signal", "namespace": str(execution_controls.get("log_namespace") or plugin), - "strategy": strategy, + "target_type": "notification_target" if notification_target else "strategy", + "strategy": strategy or "", + "notification_target": notification_target or "", "plugin": plugin, "mode": mode, "as_of": str(payload.get("as_of") or "").strip(), @@ -842,19 +900,32 @@ def _build_log_record( def _apply_plugin_contract( payload: Mapping[str, Any], *, - strategy: str, + strategy: str | None = None, + notification_target: str | None = None, plugin: str, mode: str, consumption_policy: PluginConsumptionPolicy | None = None, + notification_target_policy: PluginNotificationTargetPolicy | None = None, ) -> dict[str, Any]: contracted_payload = dict(payload) - contracted_payload["strategy"] = strategy + if strategy: + contracted_payload["target_type"] = "strategy" + contracted_payload["strategy"] = strategy + contracted_payload.pop("notification_target", None) + elif notification_target: + contracted_payload["target_type"] = "notification_target" + contracted_payload["notification_target"] = notification_target + contracted_payload.pop("strategy", None) + else: + raise ValueError("plugin contract requires either strategy or notification_target") contracted_payload["plugin"] = plugin contracted_payload["mode"] = mode contracted_payload["configured_mode"] = mode contracted_payload["effective_mode"] = mode if consumption_policy is not None: contracted_payload["consumption_policy"] = asdict(consumption_policy) + if notification_target_policy is not None: + contracted_payload["notification_target_policy"] = asdict(notification_target_policy) execution_controls = dict(contracted_payload.get("execution_controls") or {}) execution_controls.update(_mode_execution_controls(mode)) @@ -866,11 +937,16 @@ def _apply_plugin_contract( execution_controls["notification_allowed"] = bool(consumption_policy.notification_allowed) execution_controls["position_control_allowed"] = bool(consumption_policy.position_control_allowed) execution_controls["consumption_evidence_status"] = consumption_policy.evidence_status - if consumption_policy is not None and consumption_policy.intended_strategy_role == "general_market_regime_notification": + if notification_target_policy is not None: + execution_controls["notification_allowed"] = bool(notification_target_policy.notification_allowed) + execution_controls["position_control_allowed"] = False + execution_controls["consumption_evidence_status"] = notification_target_policy.evidence_status + execution_controls["notification_role"] = notification_target_policy.notification_role execution_controls["capital_impact"] = "notification_only" execution_controls["strategy_runtime_metadata_allowed"] = False execution_controls["position_control_shadow_only"] = True - execution_controls["intended_strategy_role"] = "general_market_regime_notification" + execution_controls["notification_target"] = notification_target + execution_controls["target_type"] = "notification_target" execution_controls["mode_note"] = ( "Mode is the platform behavior contract; this repository writes artifacts and does not call brokers" ) @@ -882,12 +958,14 @@ def _apply_plugin_contract( localized_messages = _build_localized_messages( contracted_payload, strategy=strategy, + notification_target=notification_target, plugin=plugin, ) contracted_payload["localized_messages"] = localized_messages contracted_payload["log_record"] = _build_log_record( contracted_payload, strategy=strategy, + notification_target=notification_target, plugin=plugin, mode=mode, localized_messages=localized_messages, @@ -999,6 +1077,60 @@ def _run_table_strategy_plugin( ) +def _run_table_notification_target_plugin( + plugin_config: Mapping[str, Any], + default_mode: str, + spec: PluginExecutionSpec, +) -> PluginRunResult: + notification_target = _safe_scope_name(plugin_config.get("notification_target"), field="notification_target") + plugin = _safe_scope_name(plugin_config.get("plugin", spec.default_plugin), field="plugin") + mode = _plugin_mode(plugin_config, default_mode) + output_dir = str(plugin_config.get("output_dir") or _default_plugin_output_dir(notification_target, plugin)).strip() + enabled = _as_bool(plugin_config.get("enabled"), default=True) + if not enabled: + return PluginRunResult( + strategy="", + plugin=plugin, + enabled=False, + mode=mode, + effective_mode=None, + status="skipped", + output_dir=output_dir, + message="plugin disabled", + target_type="notification_target", + notification_target=notification_target, + ) + _validate_plugin_mode(plugin, mode) + _validate_plugin_notification_target(plugin, notification_target) + notification_target_policy = _plugin_notification_target_policy(plugin, notification_target) + + prices_path = str(plugin_config.get("prices", "")).strip() + if not prices_path: + raise ValueError(f"{plugin} for notification_target={notification_target} requires a prices path") + payload = spec.build_payload(read_table(prices_path), plugin_config) + payload = _apply_plugin_contract( + payload, + notification_target=notification_target, + plugin=plugin, + mode=mode, + notification_target_policy=notification_target_policy, + ) + paths = spec.write_outputs(payload, output_dir) + return PluginRunResult( + strategy="", + plugin=plugin, + enabled=True, + mode=mode, + effective_mode=mode, + status="ok", + output_dir=output_dir, + latest_signal_path=str(paths["latest_signal"]), + message=f"route={payload['canonical_route']} action={payload['suggested_action']}", + target_type="notification_target", + notification_target=notification_target, + ) + + CRISIS_RESPONSE_SHADOW_SPEC = PluginExecutionSpec( default_plugin=PLUGIN_CRISIS_RESPONSE_SHADOW, build_payload=_build_crisis_response_payload, @@ -1043,6 +1175,12 @@ def run_market_regime_control_plugin(plugin_config: Mapping[str, Any], default_m PLUGIN_MACRO_RISK_GOVERNOR: run_macro_risk_governor_plugin, PLUGIN_TACO_REBOUND_SHADOW: run_taco_rebound_shadow_plugin, } +PLUGIN_SPECS: dict[str, PluginExecutionSpec] = { + PLUGIN_CRISIS_RESPONSE_SHADOW: CRISIS_RESPONSE_SHADOW_SPEC, + PLUGIN_MARKET_REGIME_CONTROL: MARKET_REGIME_CONTROL_SPEC, + PLUGIN_MACRO_RISK_GOVERNOR: MACRO_RISK_GOVERNOR_SPEC, + PLUGIN_TACO_REBOUND_SHADOW: TACO_REBOUND_SHADOW_SPEC, +} def _strategy_plugin_entries( @@ -1051,7 +1189,9 @@ def _strategy_plugin_entries( selected_plugins: Sequence[str] | None = None, selected_strategies: Sequence[str] | None = None, ) -> tuple[dict[str, Any], ...]: - entries = config.get("strategy_plugins", ()) + entries = config.get("strategy_plugins", []) + if entries is None: + entries = [] if not isinstance(entries, list): raise ValueError("strategy_plugins config must be an array of tables") @@ -1076,11 +1216,45 @@ def _strategy_plugin_entries( return tuple(selected) +def _notification_target_entries( + config: Mapping[str, Any], + *, + selected_plugins: Sequence[str] | None = None, + selected_notification_targets: Sequence[str] | None = None, +) -> tuple[dict[str, Any], ...]: + entries = config.get("notification_targets", []) + if entries is None: + entries = [] + if not isinstance(entries, list): + raise ValueError("notification_targets config must be an array of tables") + + plugin_filter = set(_as_str_tuple(selected_plugins)) + target_filter = set(_as_str_tuple(selected_notification_targets)) + selected: list[dict[str, Any]] = [] + for entry in entries: + if not isinstance(entry, Mapping): + raise ValueError("each notification_targets entry must be a table") + plugin_config = _flatten_strategy_plugin_entry(entry) + notification_target = _safe_scope_name(plugin_config.get("notification_target"), field="notification_target") + plugin = _safe_scope_name(plugin_config.get("plugin"), field="plugin") + if plugin_filter and plugin not in plugin_filter: + continue + if target_filter and notification_target not in target_filter: + continue + if _as_bool(plugin_config.get("enabled"), default=True): + _validate_plugin_notification_target(plugin, notification_target) + plugin_config["notification_target"] = notification_target + plugin_config["plugin"] = plugin + selected.append(plugin_config) + return tuple(selected) + + def run_configured_plugins( config: Mapping[str, Any], *, selected_plugins: Sequence[str] | None = None, selected_strategies: Sequence[str] | None = None, + selected_notification_targets: Sequence[str] | None = None, ) -> dict[str, Any]: default_mode = str(config.get("default_mode", SHADOW_MODE)).strip().lower() plugin_configs = _strategy_plugin_entries( @@ -1088,6 +1262,11 @@ def run_configured_plugins( selected_plugins=selected_plugins, selected_strategies=selected_strategies, ) + notification_target_configs = _notification_target_entries( + config, + selected_plugins=selected_plugins, + selected_notification_targets=selected_notification_targets, + ) results: list[PluginRunResult] = [] for plugin_config in plugin_configs: @@ -1095,12 +1274,21 @@ def run_configured_plugins( if plugin not in PLUGIN_RUNNERS: raise ValueError(f"unsupported plugin: {plugin}") results.append(PLUGIN_RUNNERS[plugin](plugin_config, default_mode)) + notification_target_results: list[PluginRunResult] = [] + for plugin_config in notification_target_configs: + plugin = str(plugin_config["plugin"]) + if plugin not in PLUGIN_RUNNERS: + raise ValueError(f"unsupported plugin: {plugin}") + notification_target_results.append( + _run_table_notification_target_plugin(plugin_config, default_mode, PLUGIN_SPECS[plugin]) + ) output_dir = Path(str(config.get("output_dir", DEFAULT_RUNNER_OUTPUT_DIR)).strip()) summary = { "schema_version": "strategy_plugins.v1", "default_mode": default_mode, "strategy_plugins": [asdict(result) for result in results], + "notification_targets": [asdict(result) for result in notification_target_results], } write_json(output_dir / "latest_run.json", summary) return summary @@ -1108,9 +1296,10 @@ def run_configured_plugins( def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run configured sidecar strategy plugins.") - parser.add_argument("--config", required=True, help="TOML config file listing strategy-scoped sidecar plugins") + parser.add_argument("--config", required=True, help="TOML config file listing strategy plugins and notification targets") parser.add_argument("--plugins", default=None, help="Optional comma-separated plugin allowlist") parser.add_argument("--strategies", default=None, help="Optional comma-separated strategy allowlist") + parser.add_argument("--notification-targets", default=None, help="Optional comma-separated notification target allowlist") return parser @@ -1121,30 +1310,40 @@ def main(argv: list[str] | None = None) -> int: config, selected_plugins=_as_str_tuple(args.plugins) or None, selected_strategies=_as_str_tuple(args.strategies) or None, + selected_notification_targets=_as_str_tuple(args.notification_targets) or None, ) for result in summary["strategy_plugins"]: print( f"{result['strategy']}:{result['plugin']} {result['status']} mode={result['mode']} " f"latest={result.get('latest_signal_path') or ''} {result.get('message') or ''}".rstrip() ) + for result in summary["notification_targets"]: + print( + f"{result['notification_target']}:{result['plugin']} {result['status']} mode={result['mode']} " + f"latest={result.get('latest_signal_path') or ''} {result.get('message') or ''}".rstrip() + ) return 0 __all__ = [ - "GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY", + "GENERAL_MARKET_REGIME_NOTIFICATION_TARGET", "PLUGIN_CRISIS_RESPONSE_SHADOW", "PLUGIN_MARKET_REGIME_CONTROL", "PLUGIN_MACRO_RISK_GOVERNOR", "PLUGIN_TACO_REBOUND_SHADOW", "PLUGIN_COMPATIBLE_STRATEGIES", + "PLUGIN_COMPATIBLE_NOTIFICATION_TARGETS", "PLUGIN_CONSUMPTION_POLICIES", "PLUGIN_CONSUMPTION_POLICY_REGISTRY", "PLUGIN_DEPRECATED_SUCCESSORS", + "PLUGIN_NOTIFICATION_TARGET_POLICIES", + "PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY", "PLUGIN_RESEARCH_ONLY_REASONS", "PLUGIN_SCHEMA_VERSIONS", "STRATEGY_PLUGIN_LOG_SCHEMA_VERSION", "STRATEGY_PLUGIN_MESSAGE_SCHEMA_VERSION", "PluginConsumptionPolicy", + "PluginNotificationTargetPolicy", "PluginRunResult", "load_plugin_config", "main", diff --git a/tests/test_strategy_plugin_runner.py b/tests/test_strategy_plugin_runner.py index 79ee338..20ac01c 100644 --- a/tests/test_strategy_plugin_runner.py +++ b/tests/test_strategy_plugin_runner.py @@ -10,12 +10,14 @@ from quant_strategy_plugins.strategy_plugin_runner import ( EVIDENCE_AUTOMATION_APPROVED, EVIDENCE_NOTIFICATION_ONLY, - GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY, + GENERAL_MARKET_REGIME_NOTIFICATION_TARGET, + PLUGIN_COMPATIBLE_NOTIFICATION_TARGETS, PLUGIN_COMPATIBLE_STRATEGIES, PLUGIN_CONSUMPTION_POLICY_REGISTRY, PLUGIN_CRISIS_RESPONSE_SHADOW, PLUGIN_DEPRECATED_SUCCESSORS, PLUGIN_MARKET_REGIME_CONTROL, + PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY, PLUGIN_MACRO_RISK_GOVERNOR, PLUGIN_SCHEMA_VERSIONS, PLUGIN_TACO_REBOUND_SHADOW, @@ -348,14 +350,14 @@ def test_strategy_plugin_runner_runs_unified_market_regime_control_for_tqqq(tmp_ def test_strategy_plugin_runner_runs_general_market_regime_notification(tmp_path) -> None: prices_path = tmp_path / "market_regime_prices.csv" - output_dir = tmp_path / GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY / "plugins" / PLUGIN_MARKET_REGIME_CONTROL + output_dir = tmp_path / GENERAL_MARKET_REGIME_NOTIFICATION_TARGET / "plugins" / PLUGIN_MARKET_REGIME_CONTROL _soxl_quiet_prices().to_csv(prices_path, index=False) config = { "output_dir": str(tmp_path / "runner"), "default_mode": "shadow", - "strategy_plugins": [ + "notification_targets": [ { - "strategy": GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY, + "notification_target": GENERAL_MARKET_REGIME_NOTIFICATION_TARGET, "plugin": PLUGIN_MARKET_REGIME_CONTROL, "enabled": True, "inputs": { @@ -374,12 +376,17 @@ def test_strategy_plugin_runner_runs_general_market_regime_notification(tmp_path summary = run_configured_plugins(config) - result = summary["strategy_plugins"][0] - assert result["strategy"] == GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY + assert summary["strategy_plugins"] == [] + result = summary["notification_targets"][0] + assert result["strategy"] == "" + assert result["target_type"] == "notification_target" + assert result["notification_target"] == GENERAL_MARKET_REGIME_NOTIFICATION_TARGET assert result["plugin"] == PLUGIN_MARKET_REGIME_CONTROL assert result["status"] == "ok" payload = json.loads((output_dir / "latest_signal.json").read_text(encoding="utf-8")) - assert payload["strategy"] == GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY + assert "strategy" not in payload + assert payload["target_type"] == "notification_target" + assert payload["notification_target"] == GENERAL_MARKET_REGIME_NOTIFICATION_TARGET assert payload["plugin"] == PLUGIN_MARKET_REGIME_CONTROL assert payload["schema_version"] in PLUGIN_SCHEMA_VERSIONS[PLUGIN_MARKET_REGIME_CONTROL] assert payload["canonical_route"] == "no_action" @@ -387,8 +394,9 @@ def test_strategy_plugin_runner_runs_general_market_regime_notification(tmp_path assert payload["execution_controls"]["strategy_runtime_metadata_allowed"] is False assert payload["execution_controls"]["position_control_allowed"] is False assert payload["execution_controls"]["consumption_evidence_status"] == EVIDENCE_NOTIFICATION_ONLY - assert payload["consumption_policy"]["strategy"] == GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY + assert payload["notification_target_policy"]["notification_target"] == GENERAL_MARKET_REGIME_NOTIFICATION_TARGET assert payload["notification"]["localized_messages"]["en-US"].startswith("No notification required") + assert "notification target" in payload["notification"]["localized_messages"]["en-US"] assert payload["log_record"]["localized_messages"]["zh-CN"] @@ -414,13 +422,15 @@ def test_strategy_plugin_runner_rejects_soxl_market_regime_control_mount(tmp_pat def test_strategy_plugin_runner_contract_registry_prefers_unified_plugin() -> None: assert set(PLUGIN_COMPATIBLE_STRATEGIES[PLUGIN_MARKET_REGIME_CONTROL]) == { - GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY, STRATEGY_NAME, "global_etf_rotation", "russell_1000_multi_factor_defensive", "tech_communication_pullback_enhancement", "mega_cap_leader_rotation_top50_balanced", } + assert set(PLUGIN_COMPATIBLE_NOTIFICATION_TARGETS[PLUGIN_MARKET_REGIME_CONTROL]) == { + GENERAL_MARKET_REGIME_NOTIFICATION_TARGET, + } assert PLUGIN_SCHEMA_VERSIONS[PLUGIN_MARKET_REGIME_CONTROL] == ("market_regime_control.v1",) assert PLUGIN_DEPRECATED_SUCCESSORS[PLUGIN_CRISIS_RESPONSE_SHADOW] == PLUGIN_MARKET_REGIME_CONTROL assert PLUGIN_DEPRECATED_SUCCESSORS[PLUGIN_MACRO_RISK_GOVERNOR] == PLUGIN_MARKET_REGIME_CONTROL @@ -429,8 +439,8 @@ def test_strategy_plugin_runner_contract_registry_prefers_unified_plugin() -> No PLUGIN_MARKET_REGIME_CONTROL, SOXL_STRATEGY_NAME, ) not in PLUGIN_CONSUMPTION_POLICY_REGISTRY - assert PLUGIN_CONSUMPTION_POLICY_REGISTRY[ - (PLUGIN_MARKET_REGIME_CONTROL, GENERAL_MARKET_REGIME_NOTIFICATION_STRATEGY) + assert PLUGIN_NOTIFICATION_TARGET_POLICY_REGISTRY[ + (PLUGIN_MARKET_REGIME_CONTROL, GENERAL_MARKET_REGIME_NOTIFICATION_TARGET) ].position_control_allowed is False assert PLUGIN_CONSUMPTION_POLICY_REGISTRY[ (PLUGIN_MARKET_REGIME_CONTROL, STRATEGY_NAME) @@ -810,3 +820,8 @@ def test_strategy_plugin_runner_example_config_uses_default_mode_without_duplica assert config["strategy_plugins"][0]["outputs"]["output_dir"].endswith( "tqqq_growth_income/plugins/market_regime_control" ) + assert config["notification_targets"][0]["notification_target"] == GENERAL_MARKET_REGIME_NOTIFICATION_TARGET + assert "strategy" not in config["notification_targets"][0] + assert config["notification_targets"][0]["outputs"]["output_dir"].endswith( + "market_regime_notification/plugins/market_regime_control" + )