From 9e38a947a980a1e2a5ea80ad88ae2f1d6e68f76c Mon Sep 17 00:00:00 2001 From: Jochen Hoenle <173445474+hoe-jo@users.noreply.github.com> Date: Tue, 30 Jun 2026 08:18:23 +0200 Subject: [PATCH] [rules score] rework rendering of safety analysis - Add Overview of all Failuremodes - Add Details Card for each Failuremode - Add Overview of all Control Measures --- MODULE.bazel | 2 +- bazel/rules/rules_score/BUILD | 16 +- .../_assets/safety_analysis_doc_pipeline.puml | 131 +++-- .../docs/_assets/tooling_chain.puml | 10 +- .../docs/_assets/traceability_dataflow.puml | 2 +- .../rules_score/docs/tooling_architecture.rst | 92 ++- .../examples/seooc/safety_analysis/BUILD | 1 + .../sample_fmea_control_measures.trlc | 12 + .../sample_fmea_failure_modes.trlc | 9 + .../seooc/safety_analysis/sample_fta2.puml | 27 + bazel/rules/rules_score/private/fmea.bzl | 391 +++++-------- bazel/rules/rules_score/src/fmea_assembler.py | 381 ++++++++++++ .../rules_score/src/safety_analysis_tools.py | 291 --------- .../rules_score/templates/conf.template.py | 31 +- .../rules_score/templates/fmea.template.rst | 9 +- bazel/rules/rules_score/test/BUILD | 8 +- bazel/rules/rules_score/test/MODULE.bazel | 2 +- .../fixtures/image_requirements/schema.rsl | 2 +- .../rules_score/test/test_fmea_assembler.py | 365 ++++++++++++ .../test/test_safety_analysis_tools.py | 363 ------------ plantuml/parser/puml_cli/BUILD | 4 + plantuml/parser/puml_cli/src/main.rs | 246 +++++++- plantuml/parser/puml_fta/BUILD | 42 ++ plantuml/parser/puml_fta/src/lib.rs | 551 ++++++++++++++++++ plantuml/parser/puml_parser/src/lib.rs | 4 +- .../puml_parser/src/preprocessor/src/lib.rs | 5 +- .../src/preprocessor/src/procedure/lib.rs | 5 +- .../src/procedure/procedure_ast.rs | 4 + .../src/procedure/procedure_parser.rs | 7 +- 29 files changed, 2002 insertions(+), 1011 deletions(-) create mode 100644 bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fta2.puml create mode 100644 bazel/rules/rules_score/src/fmea_assembler.py delete mode 100644 bazel/rules/rules_score/src/safety_analysis_tools.py create mode 100644 bazel/rules/rules_score/test/test_fmea_assembler.py delete mode 100644 bazel/rules/rules_score/test/test_safety_analysis_tools.py create mode 100644 plantuml/parser/puml_fta/BUILD create mode 100644 plantuml/parser/puml_fta/src/lib.rs diff --git a/MODULE.bazel b/MODULE.bazel index 8aaee15a..33938412 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -309,7 +309,7 @@ use_repo(pip, "manual_analysis_deps") bazel_dep(name = "trlc", version = "0.0.0") git_override( module_name = "trlc", - commit = "c4c531b9d667085daa09dfc1590edacc314bfda4", + commit = "8d25f639ff44976893d7866ba421a04db5698ebe", remote = "https://github.com/bmw-software-engineering/trlc.git", ) diff --git a/bazel/rules/rules_score/BUILD b/bazel/rules/rules_score/BUILD index 8f958492..a2a4101d 100644 --- a/bazel/rules/rules_score/BUILD +++ b/bazel/rules/rules_score/BUILD @@ -75,13 +75,17 @@ py_binary( visibility = ["//visibility:public"], ) -# Safety analysis tools (preprocess FTA PlantUML + extract lobster traceability items) +# FMEA page assembler: builds the failure-mode-centric fmea.rst body in-process +# via the extended TRLCRST library and the FTA chains JSON from puml_cli. py_binary( - name = "safety_analysis_tools", - srcs = ["src/safety_analysis_tools.py"], + name = "fmea_assembler", + srcs = ["src/fmea_assembler.py"], imports = ["src"], - main = "src/safety_analysis_tools.py", + main = "src/fmea_assembler.py", visibility = ["//visibility:public"], + deps = [ + "@trlc//tools/trlc_rst:trlc_rst_lib", + ], ) # AoU forwarding filter: filters received AoU lobster entries for chain-forwarding @@ -187,6 +191,10 @@ py_binary( srcs = ["src/sphinx_wrapper.py"], data = [ "//tools/sphinx:plantuml", + # Ship the FTA metamodel in the docs-build runfiles so conf.py can put it + # on PlantUML's include path; FTA diagrams keep ``!include + # fta_metamodel.puml`` and resolve it even under -pipe rendering. + "//plantuml:fta_metamodel", ], env = { "SOURCE_DIRECTORY": "", diff --git a/bazel/rules/rules_score/docs/_assets/safety_analysis_doc_pipeline.puml b/bazel/rules/rules_score/docs/_assets/safety_analysis_doc_pipeline.puml index c1164b3b..21884adf 100644 --- a/bazel/rules/rules_score/docs/_assets/safety_analysis_doc_pipeline.puml +++ b/bazel/rules/rules_score/docs/_assets/safety_analysis_doc_pipeline.puml @@ -13,76 +13,103 @@ @startuml safety_analysis_doc_pipeline -' Data flow from safety analysis source files through Bazel rules into the -' Sphinx staging tree. +' Component view of the FMEA build: input artifacts (authored + tooling +' defaults) flow through three in-process tool actions of the ``fmea`` rule into +' the generated files, the providers, and finally the Sphinx staging tree. skinparam linetype ortho skinparam ArrowFontSize 10 skinparam defaultTextAlignment center -skinparam nodesep 40 -skinparam ranksep 50 +skinparam nodesep 30 +skinparam ranksep 45 skinparam rectangle { BackgroundColor<> #EFF6FB BorderColor<> #0066B1 + BackgroundColor<> #E3F2FD + BorderColor<> #1565C0 BackgroundColor<> #FFF3E0 BorderColor<> #EF6C00 - BackgroundColor<> #E8F5E9 - BorderColor<> #2E7D32 BackgroundColor<> #F3E5F5 BorderColor<> #7B1FA2 BackgroundColor<> #FFFDE7 BorderColor<> #F9A825 } +skinparam component { + BackgroundColor<> #E8F5E9 + BorderColor<> #2E7D32 +} -' ── Inputs ──────────────────────────────────────────────────────────────────── -rectangle "failuremodes.trlc\ncontrolmeasures.trlc" <> as trlc -rectangle "fta_*.puml" <> as fta -rectangle "dfa.rst" <> as dfa - -' ── fmea rule ───────────────────────────────────────────────────────────────── -rectangle "**fmea**" <> as fmea - -' ── fmea generated files ────────────────────────────────────────────────────── -rectangle "fmea.rst" <> as fmea_rst -rectangle "failuremodes.inc\ncontrolmeasures.inc" <> as inc_files -rectangle "fta_*.puml (inlined)\nroot_causes.lobster" <> as puml_proc -rectangle "detail_*.rst" <> as detail_rst - -' ── fmea SphinxSourcesInfo ──────────────────────────────────────────────────── -rectangle "SphinxSourcesInfo\n──────────────────\nsrcs: fmea.rst\ndeps: fmea.rst + *.inc + *.puml\naux_srcs: detail_*.rst" <> as fmea_ssi - -' ── dependability_analysis rule ─────────────────────────────────────────────── -rectangle "**dependability_analysis**" <> as da - -' ── dependability_analysis SphinxSourcesInfo ───────────────────────────────── -rectangle "SphinxSourcesInfo\n──────────────────\nsrcs: dfa.rst + fmea.rst\ndeps: + *.inc + *.puml\naux_srcs: detail_*.rst" <> as da_ssi - -' ── dependable_element rule ─────────────────────────────────────────────────── -rectangle "**dependable_element**" <> as de - -' ── Sphinx staging tree ─────────────────────────────────────────────────────── -rectangle "dependability_analysis/\n dfa.rst ← toctree\n fmea.rst ← toctree\n failuremodes.inc\n controlmeasures.inc\n fta_*.puml\n detail_*.rst ← sub-pages" <> as stage - -' ── Edges ───────────────────────────────────────────────────────────────────── -trlc --> fmea : trlc_rst\nlobster-trlc -fta --> fmea : safety_analysis_tools - -fmea --> fmea_rst -fmea --> inc_files -fmea --> puml_proc -fmea --> detail_rst -fmea_rst --> fmea_ssi -inc_files --> fmea_ssi -puml_proc --> fmea_ssi -detail_rst --> fmea_ssi +' ── Input artifacts ────────────────────────────────────────────────────────── +package "Authored by the component team" { + rectangle "failuremodes.trlc\n(FailureMode records)" <> as fm_trlc + rectangle "controlmeasures.trlc\n(ControlMeasure records)" <> as cm_trlc + rectangle "fta_*.puml\n(root_causes attr)" <> as fta_puml +} +package "Tooling defaults (rules_score / ScoreReq)" { + rectangle "ScoreReq *.rsl\n(spec attr)" <> as rsl + rectangle "fta_metamodel.puml\n(on PlantUML include path)" <> as meta + rectangle "fmea.template.rst\n({body})" <> as tmpl + rectangle "fm/cm lobster\nconfigs" <> as lcfg +} -dfa --> da -fmea_ssi --> da : merge +' ── fmea rule: three tool actions ──────────────────────────────────────────── +package "fmea rule actions" { + component "puml_cli (FTA mode)\n--fta-output-dir\n[crate: puml_fta]" <> as puml + component "fmea_assembler\n[lib: TRLCRST]" <> as asm + component "lobster-trlc x2" <> as ltrlc +} -da --> da_ssi +' ── Generated files ────────────────────────────────────────────────────────── +package "Generated files" { + rectangle "fta_*.puml\n(authored, symlinked)" <> as puml_inl + rectangle "fta_chains.json" <> as chains + rectangle "root_causes.lobster" <> as rc_lob + rectangle "fmea.rst" <> as fmea_rst + rectangle "failuremodes.lobster\ncontrolmeasures.lobster" <> as fmcm_lob +} -da_ssi --> de : dependability_analysis attr -de --> stage : symlink srcs+deps\nsymlink aux_srcs\n(no outer toctree entry) +' ── Providers ──────────────────────────────────────────────────────────────── +rectangle "SphinxSourcesInfo\n────────────────\nsrcs: fmea.rst\ndeps: fmea.rst\naux_srcs: fta_*.puml" <> as ssi +rectangle "AnalysisInfo.lobster_files\n────────────────\nfailuremodes.lobster\ncontrolmeasures.lobster\nroot_causes.lobster" <> as ai + +' ── Downstream rules + staging ─────────────────────────────────────────────── +component "dependability_analysis" <> as da +component "dependable_element" <> as de +rectangle "dependability_analysis/\n dfa.rst <- toctree\n fmea.rst <- toctree\n fta_*.puml (.. uml::)" <> as stage + +' ── Edges: inputs -> tools ─────────────────────────────────────────────────── +fta_puml --> puml : parse macro calls + +chains --> asm +fm_trlc --> asm +cm_trlc --> asm +rsl --> asm : import resolution +tmpl --> asm + +fm_trlc --> ltrlc +cm_trlc --> ltrlc +rsl --> ltrlc +lcfg --> ltrlc + +' ── Edges: tools -> generated ──────────────────────────────────────────────── +fta_puml --> puml_inl : symlinked beside fmea.rst +meta --> stage : on PlantUML include path +puml --> chains +puml --> rc_lob +asm --> fmea_rst +ltrlc --> fmcm_lob + +' ── Edges: generated -> providers ──────────────────────────────────────────── +fmea_rst --> ssi +puml_inl --> ssi +rc_lob --> ai +fmcm_lob --> ai + +' ── Edges: providers -> downstream -> staging ──────────────────────────────── +ssi --> da : merge SphinxSourcesInfo +ai --> da : merge lobster_files +da --> de +de --> stage : symlink srcs + aux_srcs\n(aux not indexed in toctree) @enduml diff --git a/bazel/rules/rules_score/docs/_assets/tooling_chain.puml b/bazel/rules/rules_score/docs/_assets/tooling_chain.puml index 98300d5f..0db400a7 100644 --- a/bazel/rules/rules_score/docs/_assets/tooling_chain.puml +++ b/bazel/rules/rules_score/docs/_assets/tooling_chain.puml @@ -40,10 +40,11 @@ rectangle "dependable_element" <> as de rectangle "sphinx_module" <> as sphinx ' ── Tools ───────────────────────────────────────────────────────────────────── -rectangle "**TRLC**\ntrlc parser + trlc_rst\n(.trlc/.rsl -> .rst/.inc)" <> as trlc +rectangle "**TRLC**\ntrlc parser + trlc_rst\n(.trlc/.rsl -> .rst; TRLCRST lib)" <> as trlc rectangle "**rst_to_trlc**\n(.rst -> .trlc)" <> as r2t rectangle "**PlantUML Parser**\nparser + linker (Rust)\n(.puml -> .fbs.bin + .lobster)" <> as puml -rectangle "**safety_analysis_tools**\nFTA preprocess + lobster\n(.puml -> root_causes.lobster)" <> as sat +rectangle "**puml_cli** (FTA mode)\ninline metamodel + extract\n(.puml -> inlined .puml +\nfta_chains.json + root_causes.lobster)" <> as fta +rectangle "**fmea_assembler**\nTRLCRST page build\n(.trlc + chains -> fmea.rst)" <> as asm rectangle "**Lobster**\nlobster-trlc / -report /\n-ci-report / gtest_report" <> as lob rectangle "**Architecture Verifier**\nvalidation_cli\n(arch.json + .fbs.bin)" <> as verifier rectangle "**Sphinx**\nscore_build + html_merge\n(.rst -> needs.json + HTML)" <> as docs @@ -57,8 +58,9 @@ req --> trlc : render + typecheck req --> lob : lobster-trlc arch --> puml : parse diagrams -fmea --> trlc : render FM/CM -fmea --> sat : FTA root causes +fmea --> fta : FTA root causes +fmea --> asm : assemble fmea.rst +asm ..> trlc : TRLCRST lib fmea --> lob : lobster-trlc unit --> lob : gtest_report diff --git a/bazel/rules/rules_score/docs/_assets/traceability_dataflow.puml b/bazel/rules/rules_score/docs/_assets/traceability_dataflow.puml index 6f54f732..a31d7820 100644 --- a/bazel/rules/rules_score/docs/_assets/traceability_dataflow.puml +++ b/bazel/rules/rules_score/docs/_assets/traceability_dataflow.puml @@ -53,7 +53,7 @@ rectangle "PASS / FAIL\n(bazel test gate)" <> as gate reqs_in --> reqs_lob : lobster-trlc api_in --> api_lob : plantuml parser fmcm_in --> fmcm_lob : lobster-trlc -fta_in --> rc_lob : safety_analysis_tools +fta_in --> rc_lob : puml_cli (FTA mode) test_in --> test_lob : gtest_report reqs_lob --> conf diff --git a/bazel/rules/rules_score/docs/tooling_architecture.rst b/bazel/rules/rules_score/docs/tooling_architecture.rst index 4838664d..a5ce61e1 100644 --- a/bazel/rules/rules_score/docs/tooling_architecture.rst +++ b/bazel/rules/rules_score/docs/tooling_architecture.rst @@ -37,8 +37,9 @@ build wires layers 2 and 3 automatically. :doc:`overview` for the provider-flow diagram. #. **Tools** — the executables each action runs. Some are vendored third-party tools (TRLC, Lobster, the PlantUML parser, Sphinx); some are local helpers - under ``src/`` (``rst_to_trlc.py``, ``safety_analysis_tools.py``, - ``sphinx_html_merge.py``). + under ``src/`` (``rst_to_trlc.py``, ``fmea_assembler.py``, + ``sphinx_html_merge.py``). The FMEA fault-tree processing lives in the Rust + ``puml_cli`` (FTA mode, backed by the ``puml_fta`` crate). Rule → tool invocation map -------------------------- @@ -80,8 +81,10 @@ are rendered under :doc:`tool_reference/index`. - ``feature_requirements``, ``component_requirements``, ``assumed_system_requirements``, ``fmea`` - Parses and type-checks requirement / FMEA records against the ``.rsl`` - metamodel, then renders them to ``.rst`` (requirements) or ``.inc`` - (FMEA sections) for Sphinx. + metamodel and renders them to ``.rst``. ``trlc_rst`` also ships a + reusable ``TRLCRST`` library that ``fmea_assembler`` links directly to + build the FMEA page from a single in-process parse (no per-record + ``.inc`` files). * - **rst_to_trlc** - ``src/rst_to_trlc.py`` (local) - ``score_requirements_rule`` macro @@ -95,13 +98,29 @@ are rendered under :doc:`tool_reference/index`. items. The **linker** merges the FlatBuffers into ``plantuml_links.json`` for the ``clickable_plantuml`` Sphinx extension. Rejects syntactically invalid diagrams with a non-zero exit code. - * - **safety_analysis_tools** - - ``//bazel/rules/rules_score:safety_analysis_tools`` - (``src/safety_analysis_tools.py``, local) + * - **puml_cli (FTA mode)** + - ``//plantuml/parser/puml_cli`` ``--fta-output-dir`` (Rust; FTA model in + the ``puml_fta`` crate) - ``fmea`` - - Inlines ``fta_metamodel.puml`` into root-cause FTA diagrams (making them - hermetic) and extracts ``$TopEvent`` / ``$BasicEvent`` calls into - ``root_causes.lobster`` in ``lobster-act-trace`` format. + - Analysis only: parses the ``$TopEvent`` / ``$BasicEvent`` / gate macro + calls of each root-cause FTA diagram into + two outputs: ``root_causes.lobster`` (``lobster-act-trace``) and + ``fta_chains.json`` (the ordered per-failure-mode chains). The authored + diagram keeps its ``!include fta_metamodel.puml``; the metamodel ships in + the docs toolchain runfiles and is put on PlantUML's global include path + (``-Dplantuml.include.path``) so the include resolves at render even under + sphinxcontrib-plantuml's ``-pipe`` mode. Unrooted basic events and + malformed TRLC aliases are reported as build warnings rather than silently + dropped. + * - **fmea_assembler** + - ``//bazel/rules/rules_score:fmea_assembler`` + (``src/fmea_assembler.py``, local; links the ``TRLCRST`` library) + - ``fmea`` + - Assembles the failure-mode-centric ``fmea.rst`` from ``fta_chains.json`` + plus the FailureMode / ControlMeasure records in one in-process TRLC + parse: an overview table, one section per failure mode (detail + inline + fault tree + that chain's control measures), and trailing "Unlinked" + sections so nothing is dropped. * - **Lobster** - ``@lobster//`` : ``lobster-trlc``, ``lobster-report``, ``lobster-ci-report``, ``lobster-html-report``, ``gtest_report``, @@ -176,7 +195,7 @@ feed that pipeline: * **Public API diagrams** (``public_api.puml``) → PlantUML parser → ``public_api.lobster`` (enables failure-mode-to-interface tracing). * **FMEA** (``failuremodes.trlc`` / ``controlmeasures.trlc``) → ``lobster-trlc``; - **FTA** (``fta.puml``) → ``safety_analysis_tools`` → ``root_causes.lobster``. + **FTA** (``fta.puml``) → ``puml_cli`` (FTA mode) → ``root_causes.lobster``. * **Unit tests** (gtest) → ``gtest_report`` → ``.lobster``. .. _two-phase-sphinx-build: @@ -267,26 +286,51 @@ self-contained. Safety analysis document pipeline ---------------------------------- -The diagram below shows how FMEA and FTA source files travel through the three -rules (``fmea`` → ``dependability_analysis`` → ``dependable_element``) and land -in the Sphinx staging tree. Blue boxes are source files authored by the -component team; orange boxes are generated files; yellow boxes are the -``SphinxSourcesInfo`` provider payloads; the purple box is the final staging -directory consumed by Sphinx. +The component diagram below shows how the FMEA **input artifacts** — authored +``.trlc`` records and ``fta_*.puml`` diagrams plus the tooling defaults +(``ScoreReq`` ``.rsl`` spec, ``fta_metamodel.puml``, ``fmea.template.rst`` and +the lobster configs) — flow through the three in-process tool actions of the +``fmea`` rule into the generated files, the providers, and finally the Sphinx +staging tree. Blue boxes are authored sources, light-blue are tooling defaults, +green components are the tool actions, orange boxes are generated files, yellow +boxes are the provider payloads, and the purple box is the staging directory +consumed by Sphinx. .. uml:: _assets/safety_analysis_doc_pipeline.puml :align: center :alt: Safety analysis document pipeline :width: 100% +The ``fmea`` rule drives three actions, all reading the input artifacts above: + +#. **puml_cli (FTA mode)** parses each ``fta_*.puml`` directly (no rewriting) + and writes ``root_causes.lobster`` and ``fta_chains.json`` (the ordered + per-failure-mode chains). The diagrams keep their ``!include + fta_metamodel.puml``; the metamodel is on PlantUML's global include path + (shipped in the docs toolchain runfiles), so it resolves at render time. +#. **fmea_assembler** consumes ``fta_chains.json`` and parses the FailureMode / + ControlMeasure ``.trlc`` records (with the ``.rsl`` spec for import + resolution) in a single in-process ``TRLCRST`` pass, expanding + ``fmea.template.rst`` into ``fmea.rst``. +#. **lobster-trlc** (run twice) turns the FailureMode and ControlMeasure records + into ``failuremodes.lobster`` / ``controlmeasures.lobster`` for the + traceability report. + ``SphinxSourcesInfo`` carries three depsets: - **srcs** — files that become top-level toctree entries in the enclosing - document section (``fmea.rst``, ``dfa.rst``). -- **deps** — all files that must be present in the staging directory: own - ``srcs`` plus ``.inc`` rendered sections and preprocessed ``.puml`` diagrams - that ``fmea.rst`` pulls in via ``.. include::`` / ``.. uml::``. + document section. ``fmea`` emits exactly one: ``fmea.rst``. +- **deps** — all files that must be present in the staging directory; for + ``fmea`` this is just ``fmea.rst``, because the page is self-contained + (failure modes and control measures are rendered inline, not pulled in via + ``.. include::``). - **aux_srcs** — files to symlink alongside ``srcs``/``deps`` but **not** added - to the outer index toctree. ``fmea`` uses this for the ``detail_*.rst`` - sub-pages, which are referenced from the inner ``.. toctree::`` inside - ``fmea.rst`` rather than from the section index. + to any toctree. ``fmea`` uses this for the authored ``fta_*.puml`` diagrams, + which ``fmea.rst`` references inline via ``.. uml::`` and which must therefore + sit beside it in the staging tree without being indexed as documents. (The + metamodel is not staged here — it resolves via PlantUML's global include + path.) + +The lobster outputs travel separately on ``AnalysisInfo.lobster_files`` +(``failuremodes.lobster``, ``controlmeasures.lobster``, ``root_causes.lobster``) +into the ``dependability_analysis`` traceability report. diff --git a/bazel/rules/rules_score/examples/seooc/safety_analysis/BUILD b/bazel/rules/rules_score/examples/seooc/safety_analysis/BUILD index e3067db8..5ef3be59 100644 --- a/bazel/rules/rules_score/examples/seooc/safety_analysis/BUILD +++ b/bazel/rules/rules_score/examples/seooc/safety_analysis/BUILD @@ -22,6 +22,7 @@ filegroup( name = "sample_fta", srcs = [ "sample_fta.puml", + "sample_fta2.puml", ], visibility = ["//visibility:public"], ) diff --git a/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_control_measures.trlc b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_control_measures.trlc index 93d51692..da5ddd87 100644 --- a/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_control_measures.trlc +++ b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_control_measures.trlc @@ -31,3 +31,15 @@ ScoreReq.ControlMeasure NoMoreCoffee{ description = "We shall keep a coffee reserve for emergencies" version = 1 } + +ScoreReq.ControlMeasure WatchdogTimer{ + safety = ScoreReq.Asil.D + description = "A watchdog shall detect overruns and force a safe state" + version = 1 +} + +ScoreReq.ControlMeasure DeadlineMonitor{ + safety = ScoreReq.Asil.D + description = "Per-request deadline monitoring shall flag late responses" + version = 1 +} diff --git a/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_failure_modes.trlc b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_failure_modes.trlc index 745e0860..5737d6c8 100644 --- a/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_failure_modes.trlc +++ b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fmea_failure_modes.trlc @@ -22,3 +22,12 @@ ScoreReq.FailureMode SampleFailureMode{ safety = ScoreReq.Asil.B interface = "SampleLibraryAPI.GetNumber" } + +ScoreReq.FailureMode SampleFailureMode2{ + guideword = ScoreReq.GuideWord.TooLate + description = "SampleFailureMode2 responds too late" + failureeffect = "Downstream consumers time out" + version = 1 + safety = ScoreReq.Asil.D + interface = "SampleLibraryAPI.GetNumber" +} diff --git a/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fta2.puml b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fta2.puml new file mode 100644 index 00000000..ee4fed7d --- /dev/null +++ b/bazel/rules/rules_score/examples/seooc/safety_analysis/sample_fta2.puml @@ -0,0 +1,27 @@ +' ******************************************************************************* +' Copyright (c) 2025 Contributors to the Eclipse Foundation +' +' See the NOTICE file(s) distributed with this work for additional +' information regarding copyright ownership. +' +' This program and the accompanying materials are made available under the +' terms of the Apache License Version 2.0 which is available at +' https://www.apache.org/licenses/LICENSE-2.0 +' +' SPDX-License-Identifier: Apache-2.0 +' ******************************************************************************* + +@startuml + +!include fta_metamodel.puml + +' Top level (skeleton) +$TopEvent("SampleFailureMode2 responds too late", "SampleLibrary.SampleFailureMode2") + +' 2nd level gate and basic events +$OrGate("OG1", "SampleLibrary.SampleFailureMode2") + +$BasicEvent("Watchdog timer", "SampleLibrary.WatchdogTimer", "OG1") +$BasicEvent("Deadline monitor", "SampleLibrary.DeadlineMonitor", "OG1") + +@enduml diff --git a/bazel/rules/rules_score/private/fmea.bzl b/bazel/rules/rules_score/private/fmea.bzl index 0c288e3a..b5faf943 100644 --- a/bazel/rules/rules_score/private/fmea.bzl +++ b/bazel/rules/rules_score/private/fmea.bzl @@ -14,35 +14,39 @@ """ FMEA (Failure Mode and Effects Analysis) build rules for S-CORE projects. -The rule generates a single ``fmea.rst`` page (expanded from a -template) with three inline sections: - - 1. **Failure Modes** – TRLC failure-mode targets are rendered to ``.inc`` - files by trlc_rst and pulled in via ``.. include::``. - 2. **Control Measures** – TRLC control-measure targets rendered to ``.inc`` - the same way as failure modes. - 3. **Root Causes** – optional FTA PlantUML diagrams (``.puml`` / - ``.plantuml``) given via the ``root_causes`` attribute. Each diagram - is preprocessed to inline ``fta_metamodel.puml`` (making it - self-contained) and then referenced via ``.. uml::`` inside the page. - Lobster traceability items are extracted to ``{label}/root_causes.lobster``. - -Using ``.inc`` (not ``.rst``) for the helper include files keeps them out of -the Sphinx toctree (``_is_document_file`` only matches ``.rst``/``.md``) while -``_filter_doc_files`` in ``dependable_element.bzl`` still symlinks them -alongside ``fmea.rst`` so ``.. include::`` resolves at build time. +The rule generates a single, failure-mode-centric ``fmea.rst`` page: an +overview summary table followed by one section per failure mode. Each section +carries the full failure-mode safety attributes, the fault-tree diagram inline +(``.. uml::``), and a "Control Measures" subsection holding only that chain's +basic events. Failure modes and control measures not referenced by any fault +tree are appended under trailing "Unlinked …" sections. + +Pipeline: + + 1. **FTA** (``puml_cli`` in ``--fta-output-dir`` mode) – inlines + ``fta_metamodel.puml`` into each ``root_causes`` diagram and emits the + metamodel-inlined ``.puml`` (for ``.. uml::``), ``root_causes.lobster`` + (``lobster-act-trace``) and ``fta_chains.json`` (the ordered per-failure + mode chains). + 2. **Assembly** (``fmea_assembler``) – a single in-process TRLC parse via the + extended ``TRLCRST`` library renders the overview table and every chain + section into ``fmea.rst``. + 3. **Lobster** (``lobster-trlc``) – FailureMode and ControlMeasure + traceability files, unchanged. + +The metamodel-inlined ``.puml`` diagrams travel as ``aux_srcs`` so Sphinx can +resolve ``.. uml::`` without adding them to the toctree. ``AnalysisInfo`` carries all lobster traceability files (failuremodes, -controlmeasures, and root_causes if present) as a ``lobster_files`` dict -keyed by canonical filename (e.g. ``{"failuremodes.lobster": File, ...}``). -All Sphinx source files travel via ``SphinxSourcesInfo``. +controlmeasures, and root_causes if present) as a ``lobster_files`` dict keyed +by canonical filename. All Sphinx source files travel via +``SphinxSourcesInfo``. -This is a **build-only** rule. The combined traceability *test* is owned -by the ``dependability_analysis`` rule which wraps this one. +This is a **build-only** rule. The combined traceability *test* is owned by the +``dependability_analysis`` rule which wraps this one. """ load("//bazel/rules/rules_score:providers.bzl", "AnalysisInfo", "ArchitecturalDesignInfo", "SphinxSourcesInfo") -load("//bazel/rules/rules_score/private:puml_utils.bzl", "make_puml_rst_wrappers") load("//bazel/rules/rules_score/private:verbosity.bzl", "VERBOSITY_ATTR", "get_log_level") # ============================================================================ @@ -50,120 +54,85 @@ load("//bazel/rules/rules_score/private:verbosity.bzl", "VERBOSITY_ATTR", "get_l # ============================================================================ def _process_root_causes(ctx): - """Preprocess FTA diagrams (inline metamodel) and extract lobster items in one action. + """Extract FTA traceability + chains and stage the diagrams for rendering. - Args: - ctx: Rule context. Reads ``ctx.files.root_causes``, - ``ctx.file._fta_metamodel``, and - ``ctx.executable._safety_analysis_tools``. + ``puml_cli`` (FTA mode) parses the ``$TopEvent``/``$BasicEvent``/gate macro + calls straight from each diagram and emits, into ``{label}/``: + + * ``root_causes.lobster`` (``lobster-act-trace`` traceability), and + * ``fta_chains.json`` (the ordered per-failure-mode chains). + + The diagrams are *not* rewritten: each source ``.puml`` is symlinked next to + ``fmea.rst`` so ``.. uml:: `` resolves to the authored diagram. + Its ``!include fta_metamodel.puml`` is resolved at render time via the docs + toolchain's global PlantUML include path (the metamodel is shipped with + ``//tools/sphinx:sphinx-build``), so the metamodel is not staged here. Returns: - Tuple ``(preprocessed_diagrams, detail_rsts, [root_causes_lobster], rst_section_text)``. - All lists are empty and the section text is ``""`` when there are no - PlantUML root-cause inputs. + Tuple ``(diagram_aux_files, root_causes_lobster_or_None, chains_json)``. + ``diagram_aux_files`` (the staged ``.puml`` diagrams) is empty when there + are no PlantUML inputs; ``chains_json`` is always a File (an empty ``[]`` + array when there are no diagrams). """ puml_inputs = [ f for f in ctx.files.root_causes if f.extension in ("puml", "plantuml") ] + + chains_json = ctx.actions.declare_file("{}/fta_chains.json".format(ctx.label.name)) + if not puml_inputs: - return [], [], [], "" + # No fault trees: emit an empty chains file so the assembler still runs + # (rendering every failure mode without an FTA). + ctx.actions.write(chains_json, "[]\n") + return [], None, chains_json - # Declare one preprocessed output per input diagram (same directory). - preprocessed_diagrams = [ - ctx.actions.declare_file("{}/{}".format(ctx.label.name, src.basename)) - for src in puml_inputs - ] - root_causes_lobster = ctx.actions.declare_file( - "{}/root_causes.lobster".format(ctx.label.name), - ) + # Symlink each authored diagram next to fmea.rst so ``.. uml:: `` + # resolves in the Sphinx tree. + diagram_aux_files = [] + for src in puml_inputs: + staged = ctx.actions.declare_file("{}/{}".format(ctx.label.name, src.basename)) + ctx.actions.symlink(output = staged, target_file = src) + diagram_aux_files.append(staged) + + root_causes_lobster = ctx.actions.declare_file("{}/root_causes.lobster".format(ctx.label.name)) - # Single action: preprocess every diagram and extract lobster traceability. - output_dir = preprocessed_diagrams[0].dirname args = ctx.actions.args() - args.add("--metamodel", ctx.file._fta_metamodel) - args.add("--output-dir", output_dir) - args.add("--lobster", root_causes_lobster) + for src in puml_inputs: + args.add("--file", src.path) + args.add("--fta-output-dir", root_causes_lobster.dirname) args.add("--log-level", get_log_level(ctx)) - args.add_all(puml_inputs) ctx.actions.run( - inputs = puml_inputs + [ctx.file._fta_metamodel], - outputs = preprocessed_diagrams + [root_causes_lobster], - executable = ctx.executable._safety_analysis_tools, + inputs = puml_inputs, + outputs = [root_causes_lobster, chains_json], + executable = ctx.executable._puml_cli, arguments = [args], progress_message = "Processing root cause FTA diagrams for %s" % ctx.label.name, ) - # Generate one detail RST per preprocessed FTA diagram via the shared - # puml_diagram template. The "fta_" prefix is stripped from the stem so - # the page is titled e.g. "Server Not Listening" instead of - # "Fta Server Not Listening". - detail_rsts = make_puml_rst_wrappers( - ctx, - preprocessed_diagrams, - ctx.label.name, - ctx.file._puml_rst_template, - strip_prefix = "fta_", - filename_prefix = "detail_", - ) - - # Build toctree entries directly from the declared RST wrapper filenames so - # the toctree is always consistent with what make_puml_rst_wrappers produces, - # regardless of any prefix convention on the input files. - toctree_entries = [ - " " + rst.basename[:-4] # strip ".rst" - for rst in detail_rsts - ] - - root_causes_rst_section = ( - "Root Cause Analysis\n-------------------\n\n" + - ".. toctree::\n :maxdepth: 1\n\n" + - "\n".join(toctree_entries) + "\n" - ) - - return preprocessed_diagrams, detail_rsts, [root_causes_lobster], root_causes_rst_section + return diagram_aux_files, root_causes_lobster, chains_json # ============================================================================ -# Private Helpers +# Lobster (TRLC traceability) helper # ============================================================================ -def _render_trlc_inc(ctx, trlc_files, spec_files, out_name): - """Render a list of ``.trlc`` source files to an ``.inc`` file via trlc_rst. - - The ``.inc`` extension means the file is symlinked into the output - directory (via ``_filter_doc_files``) but is NOT added to any Sphinx - toctree (``_is_document_file`` only matches ``.rst`` / ``.md``). - - Args: - ctx: Rule context. - trlc_files: List of ``.trlc`` File objects to render. - spec_files: List of ``.rsl`` spec File objects needed for TRLC import - resolution (passed as sandbox inputs only). - out_name: Output filename (e.g. ``"failuremodes.inc"``). - - Returns: - Declared ``.inc`` output File inside ``{label.name}/``, or ``None`` - when ``trlc_files`` is empty. - """ +def _lobster_trlc(ctx, trlc_files, config, out_name): + """Run ``lobster-trlc`` over *trlc_files* producing ``{label}/``.""" if not trlc_files: return None - rendered = ctx.actions.declare_file( - "{}/{}".format(ctx.label.name, out_name), - ) + out = ctx.actions.declare_file("{}/{}".format(ctx.label.name, out_name)) args = ctx.actions.args() - args.add("--output", rendered.path) - args.add("--input-dir", ".") - args.add("--title", "") - args.add("--source-files") - args.add_all(trlc_files) + args.add("--config", config.path) + args.add("--out", out.path) ctx.actions.run( - inputs = trlc_files + spec_files, - outputs = [rendered], + inputs = trlc_files + ctx.files.spec + [config], + outputs = [out], + executable = ctx.executable._lobster_trlc, arguments = [args], - executable = ctx.executable._renderer, + progress_message = "lobster-trlc {}".format(out.path), ) - return rendered + return out # ============================================================================ # Private Rule Implementation @@ -172,141 +141,73 @@ def _render_trlc_inc(ctx, trlc_files, spec_files, out_name): def _fmea_impl(ctx): output_files = [] - # ------------------------------------------------------------------------- - # 0. Process root causes (FTA diagrams) if provided - # ------------------------------------------------------------------------- - preprocessed_diagrams, detail_rsts, root_cause_lobster_files, root_causes_rst_section = _process_root_causes(ctx) - output_files.extend(preprocessed_diagrams) - output_files.extend(detail_rsts) - - # ------------------------------------------------------------------------- - # 1. Render failure modes: TRLC -> .inc via trlc_rst - # ------------------------------------------------------------------------- - spec_files = ctx.files.spec - fm_inc = _render_trlc_inc(ctx, ctx.files.failuremodes, spec_files, "failuremodes.inc") - failuremodes_inc = [fm_inc] if fm_inc else [] - output_files.extend(failuremodes_inc) - - # ------------------------------------------------------------------------- - # 2. Render control measures: TRLC -> .inc via trlc_rst - # ------------------------------------------------------------------------- - cm_inc = _render_trlc_inc(ctx, ctx.files.controlmeasures, spec_files, "controlmeasures.inc") - controlmeasures_inc = [cm_inc] if cm_inc else [] - output_files.extend(controlmeasures_inc) - - # ------------------------------------------------------------------------- - # 3. Run lobster-trlc on TRLC sources -> lobster files. - # Spec files must be sandbox inputs so the TRLC parser can resolve - # ``import ScoreReq`` etc. - # ------------------------------------------------------------------------- - failuremodes_lobster_files = [] - if ctx.files.failuremodes: - failuremodes_lobster = ctx.actions.declare_file( - "{}/failuremodes.lobster".format(ctx.label.name), - ) - args = ctx.actions.args() - args.add("--config", ctx.file._fm_lobster_config.path) - args.add("--out", failuremodes_lobster.path) - ctx.actions.run( - inputs = ctx.files.failuremodes + spec_files + [ctx.file._fm_lobster_config], - outputs = [failuremodes_lobster], - executable = ctx.executable._lobster_trlc, - arguments = [args], - progress_message = "lobster-trlc {}".format(failuremodes_lobster.path), - ) - failuremodes_lobster_files.append(failuremodes_lobster) - - controlmeasures_lobster_files = [] - if ctx.files.controlmeasures: - controlmeasures_lobster = ctx.actions.declare_file( - "{}/controlmeasures.lobster".format(ctx.label.name), - ) - args = ctx.actions.args() - args.add("--config", ctx.file._cm_lobster_config.path) - args.add("--out", controlmeasures_lobster.path) - ctx.actions.run( - inputs = ctx.files.controlmeasures + spec_files + [ctx.file._cm_lobster_config], - outputs = [controlmeasures_lobster], - executable = ctx.executable._lobster_trlc, - arguments = [args], - progress_message = "lobster-trlc {}".format(controlmeasures_lobster.path), - ) - controlmeasures_lobster_files.append(controlmeasures_lobster) - - # ------------------------------------------------------------------------- - # 4. Generate fmea.rst via expand_template - # ------------------------------------------------------------------------- - fmea_rst = ctx.actions.declare_file( - "{}/fmea.rst".format(ctx.label.name), - ) + # 0. FTA: extract chains/lobster + stage diagrams (and metamodel) for rendering. + diagram_aux_files, root_causes_lobster, chains_json = _process_root_causes(ctx) + output_files.extend(diagram_aux_files) + # 1. Assemble fmea.rst from the chains + TRLC records (single in-process parse). + fmea_rst = ctx.actions.declare_file("{}/fmea.rst".format(ctx.label.name)) title = ctx.label.name - failure_modes_rst_includes = "\n\n".join( - [".. include:: " + f.basename for f in failuremodes_inc], - ) - control_measures_rst_includes = "\n\n".join( - [".. include:: " + f.basename for f in controlmeasures_inc], - ) - - failure_modes_section = "" - if failuremodes_inc: - failure_modes_section = "Failure Modes\n-------------\n\n" + failure_modes_rst_includes - - control_measures_section = "" - if controlmeasures_inc: - control_measures_section = "Control Measures\n----------------\n\n" + control_measures_rst_includes - - ctx.actions.expand_template( - template = ctx.file._template, - output = fmea_rst, - substitutions = { - "{title}": title, - "{underline}": "=" * len(title), - "{failure_modes_section}": failure_modes_section, - "{control_measures_section}": control_measures_section, - "{root_causes_section}": root_causes_rst_section, - }, + args = ctx.actions.args() + args.add("--output", fmea_rst.path) + args.add("--template", ctx.file._template.path) + args.add("--title", title) + args.add("--chains", chains_json.path) + args.add("--log-level", get_log_level(ctx)) + if ctx.files.failuremodes: + args.add("--failuremodes") + args.add_all(ctx.files.failuremodes) + if ctx.files.controlmeasures: + args.add("--controlmeasures") + args.add_all(ctx.files.controlmeasures) + if ctx.files.spec: + args.add("--spec") + args.add_all(ctx.files.spec) + ctx.actions.run( + inputs = ( + ctx.files.failuremodes + + ctx.files.controlmeasures + + ctx.files.spec + + [chains_json, ctx.file._template] + ), + outputs = [fmea_rst], + executable = ctx.executable._fmea_assembler, + arguments = [args], + progress_message = "Assembling FMEA page for %s" % ctx.label.name, ) output_files.append(fmea_rst) - # ------------------------------------------------------------------------- - # 5. Build providers - # ------------------------------------------------------------------------- + # 2. lobster-trlc traceability for FailureMode / ControlMeasure records. + fm_lobster = _lobster_trlc(ctx, ctx.files.failuremodes, ctx.file._fm_lobster_config, "failuremodes.lobster") + cm_lobster = _lobster_trlc(ctx, ctx.files.controlmeasures, ctx.file._cm_lobster_config, "controlmeasures.lobster") + + # 3. Providers. lobster_files = {} - for f in failuremodes_lobster_files: - lobster_files["failuremodes.lobster"] = f - for f in controlmeasures_lobster_files: - lobster_files["controlmeasures.lobster"] = f - for f in root_cause_lobster_files: - lobster_files["root_causes.lobster"] = f - - # detail_rsts are NOT top-level toctree entries (they live in sub-toctrees - # within fmea.rst), but they must be symlinked alongside fmea.rst so Sphinx - # can resolve the toctree references. They go into aux_srcs so that - # dependable_element symlinks them without adding them to the outer index. - toctree_files = [f for f in output_files if f not in detail_rsts] - all_sphinx_srcs = depset(toctree_files) - - # Only include fmea's own generated files in the sphinx deps. arch_design - # files are handled separately by dependable_element via its - # architectural_design attribute, so omitting them here avoids their RST - # wrappers being symlinked into the dependability_analysis/ section as - # orphaned documents. - sphinx_deps = [all_sphinx_srcs] + if fm_lobster: + lobster_files["failuremodes.lobster"] = fm_lobster + if cm_lobster: + lobster_files["controlmeasures.lobster"] = cm_lobster + if root_causes_lobster: + lobster_files["root_causes.lobster"] = root_causes_lobster + + # The preprocessed .puml diagrams are referenced inline via ``.. uml::`` but + # must not be toctree documents, so they travel as aux_srcs (symlinked + # alongside fmea.rst by dependable_element without being indexed). + sphinx_srcs = depset([fmea_rst]) return [ DefaultInfo( - files = depset(output_files), + files = depset(output_files + [v for v in lobster_files.values()]), ), AnalysisInfo( name = ctx.label.name, lobster_files = lobster_files, ), SphinxSourcesInfo( - srcs = all_sphinx_srcs, - deps = depset(transitive = sphinx_deps), - aux_srcs = depset(detail_rsts), + srcs = sphinx_srcs, + deps = depset(transitive = [sphinx_srcs]), + aux_srcs = depset(diagram_aux_files), ), ] @@ -316,7 +217,8 @@ def _fmea_impl(ctx): _fmea = rule( implementation = _fmea_impl, - doc = "Renders FMEA TRLC sources to .inc files and generates lobster traceability files. " + + doc = "Renders a failure-mode-centric FMEA page (overview table + one chain " + + "section per failure mode) and lobster traceability files. " + "Build-only rule; traceability testing is owned by dependability_analysis.", attrs = dict( { @@ -348,23 +250,20 @@ _fmea = rule( mandatory = False, doc = "Reference to architectural_design target for traceability.", ), - "_safety_analysis_tools": attr.label( - default = Label("//bazel/rules/rules_score:safety_analysis_tools"), + "_puml_cli": attr.label( + default = Label("//plantuml/parser/puml_cli:puml_cli"), executable = True, allow_files = True, cfg = "exec", - doc = "safety_analysis_tools binary: preprocess and extract subcommands.", - ), - "_fta_metamodel": attr.label( - default = Label("//plantuml:fta_metamodel"), - allow_single_file = True, - doc = "fta_metamodel.puml whose content is inlined into root cause diagrams.", + doc = "puml_cli binary used in FTA mode to inline the metamodel and " + + "extract root_causes.lobster + fta_chains.json.", ), - "_renderer": attr.label( - default = Label("@trlc//tools/trlc_rst:trlc_rst"), + "_fmea_assembler": attr.label( + default = Label("//bazel/rules/rules_score:fmea_assembler"), executable = True, allow_files = True, cfg = "exec", + doc = "FMEA page assembler (imports the extended TRLCRST library).", ), "_lobster_trlc": attr.label( default = Label("@lobster//:lobster-trlc"), @@ -386,12 +285,7 @@ _fmea = rule( "_template": attr.label( default = Label("//bazel/rules/rules_score:templates/fmea.template.rst"), allow_single_file = True, - doc = "RST template for the FMEA page.", - ), - "_puml_rst_template": attr.label( - default = Label("//bazel/rules/rules_score:templates/puml_diagram.template.rst"), - allow_single_file = True, - doc = "RST template for PlantUML diagram wrapper pages.", + doc = "RST template for the FMEA page (single ``{body}`` placeholder).", ), }, **VERBOSITY_ATTR @@ -412,9 +306,9 @@ def fmea( **kwargs): """Define FMEA (Failure Mode and Effects Analysis) following S-CORE process guidelines. - Generates a single ``fmea.rst`` page with up to three sections: - Failure Modes (TRLC), Control Measures (TRLC), and optionally a - Root Causes section with FTA PlantUML diagrams. + Generates a single, failure-mode-centric ``fmea.rst`` page: an overview + summary table followed by one section per failure mode (failure-mode detail, + the fault tree inline, and that chain's control measures). FTA diagrams passed via ``root_causes`` are preprocessed to inline ``fta_metamodel.puml`` (hermetic, no ``!include`` at render time) and @@ -434,8 +328,7 @@ def fmea( root_causes: Optional FTA PlantUML diagram files (``.puml`` / ``.plantuml``) representing the root causes of failure modes. arch_design: Optional ``architectural_design`` target for traceability. - visibility: Bazel visibility. - tags: Additional Bazel tags. + **kwargs: Additional arguments (e.g. ``visibility``, ``tags``). """ _fmea( name = name, diff --git a/bazel/rules/rules_score/src/fmea_assembler.py b/bazel/rules/rules_score/src/fmea_assembler.py new file mode 100644 index 00000000..8f086c48 --- /dev/null +++ b/bazel/rules/rules_score/src/fmea_assembler.py @@ -0,0 +1,381 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +"""Assemble a failure-mode-centric ``fmea.rst`` page. + +The page is pivoted around the safety chain: an overview summary table followed +by one section per failure mode, each containing the failure-mode detail, the +fault-tree diagram inline, and a "Control Measures" subsection holding only that +chain's basic events. Failure modes and control measures not referenced by any +fault tree are appended under trailing "Unlinked …" sections so nothing is +dropped. + +A single in-process TRLC parse (via the extended ``TRLCRST`` library) backs the +whole page — no per-record Bazel actions and no ``.inc`` splitting. +""" + +import argparse +import dataclasses +import json +import logging +import re +import sys + +from trlc_rst import TRLCRST, TRLCParseError + +logger = logging.getLogger(__name__) + +_LEVEL_MAP = { + "error": logging.ERROR, + "warn": logging.WARNING, + "info": logging.INFO, + "debug": logging.DEBUG, +} + +# Global Control Measures summary table columns. +_CM_TABLE_COLUMNS = {"safety": "ASIL", "description": "Description"} +# Overview summary table columns (one row per failure mode). +_FM_TABLE_COLUMNS = { + "guideword": "Guideword", + "safety": "ASIL", + "interface": "Interface", +} + +_OVERVIEW_TITLE = "Overview" +_FAILURE_MODES_TITLE = "Failure Modes" +_CONTROL_MEASURES_TITLE = "Control Measures" +_ROOT_CAUSE_TITLE = "Root Cause Analysis" + +# ASIL value -> sphinx-design badge role (severity-coloured). +_ASIL_BADGE = { + "QM": "bdg-secondary", + "B": "bdg-warning", + "D": "bdg-danger", +} +_DEFAULT_BADGE = "bdg-secondary" +_GUIDEWORD_BADGE = "bdg-info" + + +def _heading(text: str, char: str) -> str: + return f"{text}\n{char * len(text)}\n" + + +def _indent(text: str, n: int = 3) -> str: + """Indent every non-empty line of *text* by *n* spaces (for nesting under a + directive); blank lines are kept empty.""" + pad = " " * n + return "\n".join(pad + line if line.strip() else "" for line in text.splitlines()) + + +def _anchor(fqn: str) -> str: + """Sphinx cross-reference label derived from a fully-qualified name.""" + return "fmea-" + re.sub(r"[^0-9a-zA-Z]+", "-", fqn).strip("-").lower() + + +def _ref(fqn: str, name: str) -> str: + return f":ref:`{name} <{_anchor(fqn)}>`" + + +@dataclasses.dataclass +class _Directive: + """A renderable RST / sphinx-design directive node. + + Indentation and blank-line separation are handled centrally in + :meth:`render`, so element builders stay declarative trees instead of + hand-concatenated strings. ``body`` items may be nested ``_Directive``\\ s + or raw RST string blocks (e.g. a trlc_rst-rendered table or description). + """ + + name: str + arg: str = "" + options: dict = dataclasses.field(default_factory=dict) + body: list = dataclasses.field(default_factory=list) + + def render(self) -> str: + lines = [f".. {self.name}::" + (f" {self.arg}" if self.arg else "")] + for key, value in self.options.items(): + lines.append(f" :{key}: {value}") + blocks = [b for b in (_render_block(x) for x in self.body if x) if b] + if blocks: + lines.append("") + lines.append(_indent("\n\n".join(blocks))) + return "\n".join(lines) + + +def _render_block(block) -> str: + """Render a nested directive node or a raw RST string block.""" + if isinstance(block, _Directive): + return block.render() + return str(block).rstrip("\n") + + +# --- sphinx-design element builders (declarative; no manual indentation) ---- + + +def _grid(items: list, columns: int = 2, gutter: int | None = None) -> _Directive: + options = {} if gutter is None else {"gutter": gutter} + return _Directive("grid", str(columns), options, list(items)) + + +def _grid_item(body, options: dict | None = None) -> _Directive: + """A bare grid cell (no card chrome).""" + return _Directive("grid-item", options=options or {}, body=[body]) + + +def _card(title: str, body) -> _Directive: + """A ``grid-item-card``; *title* may be empty for a header-less card.""" + return _Directive( + "grid-item-card", title, body=body if isinstance(body, list) else [body] + ) + + +def _badge(role: str, text: str) -> str: + return f":{role}:`{text}`" + + +# --------------------------------------------------------------------------- +# Element renderers — each returns a directive node (or None) +# --------------------------------------------------------------------------- + + +def _attr_grid(obj: object) -> _Directive | None: + """FM attributes: guideword/ASIL as centred chips (no card chrome), + interface/failure-effect as titled cards; a gutter separates the rows.""" + fields = obj.to_python_dict() + items = [] + guideword = fields.get("guideword") + if guideword: + items.append( + _grid_item(_badge(_GUIDEWORD_BADGE, guideword), {"class": "sd-text-center"}) + ) + safety = fields.get("safety") + if safety: + role = _ASIL_BADGE.get(safety, _DEFAULT_BADGE) + items.append( + _grid_item(_badge(role, f"ASIL {safety}"), {"class": "sd-text-center"}) + ) + for field_name, label in ( + ("interface", "Interface"), + ("failureeffect", "Failure Effect"), + ): + value = fields.get(field_name) + if value: + items.append(_card(label, value)) + return _grid(items, columns=2, gutter=3) if items else None + + +def _description_card(renderer: TRLCRST, fqn: str) -> _Directive | None: + """Description as a prominent card in a ``grid:: 1`` so its borders align + with the attribute grid above.""" + description = renderer.field_value_for(fqn, "description") + if not description: + return None + return _grid([_card("Description", description)], columns=1) + + +def _cm_card(renderer: TRLCRST, fqn: str, obj: object) -> _Directive: + """One control-measure card: bold ID with inline ASIL badge, then description. + + Both the ID line and the description are direct text content of the card + (not nested grids), so they align at the same indentation as the Description + card body above. + """ + safety = obj.to_python_dict().get("safety", "") + badge_str = ( + " " + _badge(_ASIL_BADGE.get(safety, _DEFAULT_BADGE), f"ASIL {safety}") + if safety + else "" + ) + header_text = f"**{obj.name}**{badge_str}" + description = renderer.field_value_for(fqn, "description") + body: list = [header_text] + if description: + body.append(description) + return _card("", body) + + +def _cm_grid(renderer: TRLCRST, obj_map: dict, cms: list[str]) -> _Directive | None: + cards = [_cm_card(renderer, fqn, obj_map[fqn]) for fqn in cms if fqn in obj_map] + return _grid(cards, columns=1) if cards else None + + +def _fm_dropdown( + renderer: TRLCRST, fqn: str, obj: object, chain: dict | None +) -> _Directive: + """One collapsible failure-mode dropdown. + + *chain* is ``None`` for an orphan failure mode (no fault tree); the Root + Cause Analysis and Control Measures parts are then omitted. + """ + body = [_attr_grid(obj), _description_card(renderer, fqn)] + if chain is not None: + body.append(_Directive("rubric", _ROOT_CAUSE_TITLE)) + body.append(_Directive("uml", chain["puml"])) + cms = _cm_grid( + renderer, renderer.objects_by_fqn(), chain.get("control_measures", []) + ) + if cms is not None: + body.append(_Directive("rubric", _CONTROL_MEASURES_TITLE)) + body.append(cms) + return _Directive("dropdown", fqn, {"name": _anchor(fqn)}, body) + + +# --------------------------------------------------------------------------- +# Section renderers — top-level page sections (return RST strings) +# --------------------------------------------------------------------------- + + +def _validate_chain(chain) -> str: + """Return a chain's ``fm_fqn`` or raise ``ValueError`` if it is malformed.""" + try: + fqn = chain["fm_fqn"] + chain["puml"] + except (KeyError, TypeError) as exc: + raise ValueError( + f"Malformed chain entry {chain!r} in fta_chains.json: " + f"missing required key {exc}" + ) from exc + return fqn + + +def _render_overview(renderer: TRLCRST, fm_fqns: list[str]) -> str: + if not fm_fqns: + return "" + table = renderer.render_table_to_string( + _FM_TABLE_COLUMNS, fqns=fm_fqns, name_header="Failure Mode", link_fn=_ref + ) + return _heading(_OVERVIEW_TITLE, "-") + "\n" + table + + +def _render_failure_modes( + renderer: TRLCRST, chains: list, obj_map: dict, fm_fqns: list[str] +) -> str: + dropdowns = [] + linked = set() + for chain in chains: + fqn = _validate_chain(chain) + if fqn in obj_map: + linked.add(fqn) + dropdowns.append(_fm_dropdown(renderer, fqn, obj_map[fqn], chain)) + else: + logger.warning( + "fta_chains.json references unknown FailureMode %r; " + "no matching TRLC record — chain skipped", + fqn, + ) + # Orphan failure modes (no fault tree) still render, without FTA / CMs. + for fqn in fm_fqns: + if fqn not in linked: + dropdowns.append(_fm_dropdown(renderer, fqn, obj_map[fqn], None)) + body = "\n\n".join(d.render() for d in dropdowns) + return _heading(_FAILURE_MODES_TITLE, "-") + "\n" + body + "\n" + + +def _render_control_measures(renderer: TRLCRST, cm_fqns: list[str]) -> str: + if not cm_fqns: + return "" + table = renderer.render_table_to_string( + _CM_TABLE_COLUMNS, fqns=cm_fqns, name_header="Control Measure" + ) + return _heading(_CONTROL_MEASURES_TITLE, "-") + "\n" + table + + +def _build_body(renderer: TRLCRST, chains: list, title: str) -> str: + obj_map = renderer.objects_by_fqn() + fm_fqns = [fqn for fqn, obj in obj_map.items() if obj.n_typ.name == "FailureMode"] + cm_fqns = [ + fqn for fqn, obj in obj_map.items() if obj.n_typ.name == "ControlMeasure" + ] + + sections = [ + _heading(title, "="), + _render_overview(renderer, fm_fqns), + _render_failure_modes(renderer, chains, obj_map, fm_fqns), + _render_control_measures(renderer, cm_fqns), + ] + return "\n".join(s for s in sections if s) + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--output", required=True, help="Output fmea.rst path.") + parser.add_argument("--template", required=True, help="RST template path.") + parser.add_argument("--title", required=True, help="Page title.") + parser.add_argument( + "--chains", + required=True, + help="fta_chains.json produced by puml_cli FTA mode.", + ) + parser.add_argument( + "--failuremodes", nargs="*", default=[], help="FailureMode .trlc files." + ) + parser.add_argument( + "--controlmeasures", + nargs="*", + default=[], + help="ControlMeasure .trlc files.", + ) + parser.add_argument( + "--spec", + nargs="*", + default=[], + help="TRLC .rsl/.trlc spec files for import resolution.", + ) + parser.add_argument( + "--log-level", + choices=["error", "warn", "info", "debug"], + default="warn", + dest="log_level", + help="Log level for tool output (default: warn).", + ) + args = parser.parse_args() + + logging.basicConfig( + level=_LEVEL_MAP[args.log_level], format="%(levelname)s: %(message)s" + ) + + try: + with open(args.chains, encoding="utf-8") as fh: + chains = json.load(fh) + except (OSError, json.JSONDecodeError) as exc: + logger.error("Error reading chains file %s: %s", args.chains, exc) + sys.exit(1) + + source_files = list(args.failuremodes) + list(args.controlmeasures) + renderer = TRLCRST( + input_directory=None, + source_files=source_files, + dep_files=list(args.spec), + ) + try: + renderer.parse_trlc_files() + except TRLCParseError as exc: + logger.error("TRLC parse error: %s", exc) + sys.exit(1) + + body = _build_body(renderer, chains, args.title) + + with open(args.template, encoding="utf-8", newline="") as fh: + template = fh.read() + if "{body}" not in template: + logger.error( + "Template %r does not contain a '{body}' placeholder", args.template + ) + sys.exit(1) + rendered = template.replace("{body}", body) + + with open(args.output, "w", newline="", encoding="utf-8") as fh: + fh.write(rendered) + + +if __name__ == "__main__": + main() diff --git a/bazel/rules/rules_score/src/safety_analysis_tools.py b/bazel/rules/rules_score/src/safety_analysis_tools.py deleted file mode 100644 index 6101b4af..00000000 --- a/bazel/rules/rules_score/src/safety_analysis_tools.py +++ /dev/null @@ -1,291 +0,0 @@ -# ******************************************************************************* -# Copyright (c) 2026 Contributors to the Eclipse Foundation -# -# See the NOTICE file(s) distributed with this work for additional -# information regarding copyright ownership. -# -# This program and the accompanying materials are made available under the -# terms of the Apache License Version 2.0 which is available at -# https://www.apache.org/licenses/LICENSE-2.0 -# -# SPDX-License-Identifier: Apache-2.0 -# ******************************************************************************* -""" -FTA PlantUML lobster linker. - -Parses PlantUML FTA diagrams and extracts ``$TopEvent`` and ``$BasicEvent`` -procedure calls, producing a ``.lobster`` file in ``lobster-act-trace`` format. - -Each extracted item uses the *alias* (second argument) as its tag and name -because the alias is the TRLC fully-qualified name of the corresponding -safety-analysis record (e.g. ``SampleLibrary.SampleFailureMode``). A ``refs`` -entry pointing at ``req `` links the FTA artifact back to the -matching TRLC requirement in the traceability chain. - -Supported call patterns (single-line, double-quoted args):: - - $TopEvent("Human readable name", "Namespace.RecordName") - $BasicEvent("Human readable name", "Namespace.RecordName", "GateAlias") -""" - -import argparse -import json -import logging -from pathlib import Path - -logger = logging.getLogger(__name__) - -_LEVEL_MAP = { - "error": logging.ERROR, - "warn": logging.WARNING, - "info": logging.INFO, - "debug": logging.DEBUG, -} - -LOBSTER_GENERATOR = "safety_analysis_tools" -LOBSTER_SCHEMA = "lobster-act-trace" -LOBSTER_VERSION = 3 - -# --------------------------------------------------------------------------- -# PlantUML call parser -# TODO: Replace with Plantuml Parser -# --------------------------------------------------------------------------- - -# Procedure names and the indices of (name_arg, alias_arg) within their arg list. -_FTA_PROCEDURES: tuple[tuple[str, str, int, int], ...] = ( - ("$TopEvent", "TopEvent", 0, 1), - ("$BasicEvent", "BasicEvent", 0, 1), -) - - -def _parse_quoted_args(line: str, proc_name: str) -> list[str] | None: - """Extract double-quoted string arguments from a PlantUML procedure call. - - Finds ``proc_name(...)`` in *line*, then collects every double-quoted token - inside the parentheses. Returns ``None`` if the procedure is not present. - """ - marker = proc_name + "(" - call_start = line.find(marker) - if call_start == -1: - return None - paren_open = call_start + len(marker) - 1 - paren_close = line.find(")", paren_open + 1) - if paren_close == -1: - return None - inside = line[paren_open + 1 : paren_close] - args: list[str] = [] - pos = 0 - while pos < len(inside): - q_open = inside.find('"', pos) - if q_open == -1: - break - q_close = inside.find('"', q_open + 1) - if q_close == -1: - break - args.append(inside[q_open + 1 : q_close]) - pos = q_close + 1 - return args if args else None - - -def _is_valid_trlc_fqn(alias: str) -> bool: - """Return True when *alias* looks like ``Package.RecordName``.""" - parts = alias.split(".") - if len(parts) != 2: - return False - return all( - part - and (part[0].isalpha() or part[0] == "_") - and all(c.isalnum() or c == "_" for c in part) - for part in parts - ) - - -# --------------------------------------------------------------------------- -# Parser -# --------------------------------------------------------------------------- - - -def extract_fta_items(puml_file: str) -> list[dict]: - """Parse a PlantUML FTA file and return lobster trace items. - - Args: - puml_file: Path to the ``.puml`` file to parse. - - Returns: - List of lobster item dicts in ``lobster-act-trace`` format. - """ - path = Path(puml_file) - try: - content = path.read_text(encoding="utf-8") - except OSError: - logger.exception("Cannot read '%s'", puml_file) - raise - - items: list[dict] = [] - - for line_number, line in enumerate(content.splitlines(), start=1): - for proc_name, kind, name_idx, alias_idx in _FTA_PROCEDURES: - call_args = _parse_quoted_args(line, proc_name) - if call_args is None or len(call_args) <= max(name_idx, alias_idx): - continue - name = call_args[name_idx] - alias = call_args[alias_idx] - if not _is_valid_trlc_fqn(alias): - logger.warning( - "%s:%d: alias %r does not look like a valid " - "TRLC fully-qualified name (expected 'Package.Record')", - puml_file, - line_number, - alias, - ) - items.append( - { - "tag": f"fta {alias}", - "location": { - "kind": "file", - "file": str(path), - "line": line_number, - "column": None, - }, - "name": alias, - "messages": [], - "just_up": [], - "just_down": [], - "just_global": [], - "refs": [f"req {alias}"], - "framework": "PlantUML", - "kind": kind, - } - ) - logger.debug( - "Found %s: alias=%r name=%r at line %d", - kind, - alias, - name, - line_number, - ) - break # one match per line - - if not items: - logger.warning("No FTA events found in '%s'", puml_file) - - return items - - -def create_lobster_output(items: list[dict]) -> dict: - """Wrap items in the standard lobster JSON envelope.""" - return { - "data": items, - "generator": LOBSTER_GENERATOR, - "schema": LOBSTER_SCHEMA, - "version": LOBSTER_VERSION, - } - - -# --------------------------------------------------------------------------- -# PlantUML preprocessor -# --------------------------------------------------------------------------- - - -def preprocess_puml( - input_path: str, - metamodel_path: str, - output_path: str, -) -> None: - """Inline ``!include fta_metamodel.puml`` into a PlantUML file. - - Replaces the ``!include fta_metamodel.puml`` directive with the content - of the metamodel file (stripping outer ``@startuml`` / ``@enduml`` - markers so the combined file remains a valid PlantUML diagram). - - This avoids the need to colocate the metamodel alongside the diagram at - build time and eliminates fragile shell ``cp`` actions in Bazel rules. - - Args: - input_path: Path to the source ``.puml`` file. - metamodel_path: Path to ``fta_metamodel.puml``. - output_path: Path for the preprocessed output ``.puml``. - """ - metamodel = Path(metamodel_path).read_text(encoding="utf-8") - # Strip @startuml / @enduml so they don't nest inside the host diagram. - meta_lines = [ - line - for line in metamodel.splitlines(keepends=True) - if line.strip() not in ("@startuml", "@enduml") - ] - meta_content = "".join(meta_lines) - - source = Path(input_path).read_text(encoding="utf-8") - processed = source.replace("!include fta_metamodel.puml", meta_content) - - Path(output_path).write_text(processed, encoding="utf-8") - logger.info("Preprocessed %s → %s", input_path, output_path) - - -# --------------------------------------------------------------------------- -# CLI -# --------------------------------------------------------------------------- - - -def main() -> None: - parser = argparse.ArgumentParser( - description="Inline fta_metamodel.puml into FTA diagrams and extract lobster traceability.", - ) - parser.add_argument( - "--log-level", - choices=["error", "warn", "info", "debug"], - default="warn", - dest="log_level", - help="Log level for tool output (default: warn).", - ) - parser.add_argument( - "--metamodel", - required=True, - help="Path to fta_metamodel.puml to inline.", - ) - parser.add_argument( - "--output-dir", - required=True, - dest="output_dir", - help="Directory for the preprocessed .puml output files.", - ) - parser.add_argument( - "--lobster", - required=True, - help="Output .lobster traceability file path.", - ) - parser.add_argument( - "inputs", - nargs="+", - help="PlantUML FTA .puml files to process.", - ) - - args = parser.parse_args() - logging.basicConfig( - level=_LEVEL_MAP[args.log_level], format="%(levelname)s: %(message)s" - ) - _run_preprocess(args) - - -def _run_preprocess(args: argparse.Namespace) -> None: - """Preprocess each diagram (inline metamodel) and extract lobster items.""" - output_dir = Path(args.output_dir) - output_dir.mkdir(parents=True, exist_ok=True) - - all_items: list[dict] = [] - for puml_file in args.inputs: - preprocessed_path = output_dir / Path(puml_file).name - preprocess_puml(puml_file, args.metamodel, str(preprocessed_path)) - items = extract_fta_items(puml_file) - logger.info("Extracted %d item(s) from '%s'", len(items), puml_file) - all_items.extend(items) - - lobster_output = create_lobster_output(all_items) - with open(args.lobster, "w", encoding="utf-8") as fh: - json.dump(lobster_output, fh, indent=2) - fh.write("\n") - logger.info("Wrote %d lobster item(s) to %s", len(all_items), args.lobster) - - -if __name__ == "__main__": - main() diff --git a/bazel/rules/rules_score/templates/conf.template.py b/bazel/rules/rules_score/templates/conf.template.py index df5964d1..a694e406 100644 --- a/bazel/rules/rules_score/templates/conf.template.py +++ b/bazel/rules/rules_score/templates/conf.template.py @@ -205,9 +205,38 @@ def _resolve_execroot_path(path_value: str) -> str: f"Could not find plantuml binary via runfiles lookup. Searched: {searched}." ) +# Locate the FTA metamodel in the docs-build runfiles (shipped via +# //tools/sphinx:sphinx-build data) so it can be put on PlantUML's include path. +# FTA diagrams keep their ``!include fta_metamodel.puml``; sphinxcontrib-plantuml +# renders via ``-pipe`` (no source-file dir), so a relative include only resolves +# through ``plantuml.include.path``. +fta_metamodel_dir = "" +for repo_name in plantuml_repo_candidates: + candidate = r.Rlocation(f"{repo_name}/plantuml/fta_metamodel.puml", source_repo="") + if candidate and Path(candidate).exists(): + fta_metamodel_dir = str(Path(candidate).parent) + logger.info(f"Found fta_metamodel.puml on include path: {candidate}") + break + +if not fta_metamodel_dir: + logger.warning( + "fta_metamodel.puml not found in runfiles — FTA diagrams using " + "!include fta_metamodel.puml will fail to render. " + f"Searched repo candidates: {plantuml_repo_candidates}" + ) + # Use PlantUML's built-in Smetana layout engine (Java port of Graphviz). # This avoids requiring an external dot binary in the Bazel sandbox. -plantuml = f"{plantuml_path} -Playout=smetana" +# ``--jvm_flag`` is consumed by the java_binary launcher and sets the JVM system +# property PlantUML reads for its include search path. It must precede the +# program args (``-Playout`` etc.), or the launcher forwards it to PlantUML +# (which ignores it) instead of the JVM. +_include_flag = ( + f" --jvm_flag=-Dplantuml.include.path={fta_metamodel_dir}" + if fta_metamodel_dir + else "" +) +plantuml = f"{plantuml_path}{_include_flag} -Playout=smetana" plantuml_output_format = "svg_obj" # --------------------------------------------------------------------------- diff --git a/bazel/rules/rules_score/templates/fmea.template.rst b/bazel/rules/rules_score/templates/fmea.template.rst index 4afca3dd..2b03c961 100644 --- a/bazel/rules/rules_score/templates/fmea.template.rst +++ b/bazel/rules/rules_score/templates/fmea.template.rst @@ -12,11 +12,4 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* -{title} -{underline} - -{failure_modes_section} - -{root_causes_section} - -{control_measures_section} +{body} diff --git a/bazel/rules/rules_score/test/BUILD b/bazel/rules/rules_score/test/BUILD index 2c0b4e11..e20b1fa7 100644 --- a/bazel/rules/rules_score/test/BUILD +++ b/bazel/rules/rules_score/test/BUILD @@ -784,10 +784,10 @@ trlc_requirements_test( ) py_test( - name = "test_safety_analysis_tools", + name = "test_fmea_assembler", size = "small", - srcs = ["test_safety_analysis_tools.py"], - deps = ["@score_tooling//bazel/rules/rules_score:safety_analysis_tools"], + srcs = ["test_fmea_assembler.py"], + deps = ["@score_tooling//bazel/rules/rules_score:fmea_assembler"], ) py_test( @@ -825,8 +825,8 @@ test_suite( ":seooc_tests", ":sphinx_module_tests", ":test_aou_forwarding_to_lobster", + ":test_fmea_assembler", ":test_rst_to_trlc", - ":test_safety_analysis_tools", ":test_trlc_rst_image_rendering", ":unit_component_tests", "//fixtures/image_srcs:requirements_image_tests", diff --git a/bazel/rules/rules_score/test/MODULE.bazel b/bazel/rules/rules_score/test/MODULE.bazel index 81e40c0c..040ca638 100644 --- a/bazel/rules/rules_score/test/MODULE.bazel +++ b/bazel/rules/rules_score/test/MODULE.bazel @@ -97,7 +97,7 @@ register_toolchains( bazel_dep(name = "trlc", version = "0.0.0") git_override( module_name = "trlc", - commit = "c4c531b9d667085daa09dfc1590edacc314bfda4", + commit = "8d25f639ff44976893d7866ba421a04db5698ebe", remote = "https://github.com/bmw-software-engineering/trlc.git", ) diff --git a/bazel/rules/rules_score/test/fixtures/image_requirements/schema.rsl b/bazel/rules/rules_score/test/fixtures/image_requirements/schema.rsl index d6003dd4..d29dcbd2 100644 --- a/bazel/rules/rules_score/test/fixtures/image_requirements/schema.rsl +++ b/bazel/rules/rules_score/test/fixtures/image_requirements/schema.rsl @@ -13,5 +13,5 @@ package ImageTest type Requirement { - description String + description Markup_String } diff --git a/bazel/rules/rules_score/test/test_fmea_assembler.py b/bazel/rules/rules_score/test/test_fmea_assembler.py new file mode 100644 index 00000000..8b555e9b --- /dev/null +++ b/bazel/rules/rules_score/test/test_fmea_assembler.py @@ -0,0 +1,365 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +"""Unit tests for the FMEA page assembler layout logic.""" + +import json +import os +import sys +import tempfile +import unittest + +import fmea_assembler as fa + + +class _Type: + def __init__(self, name): + self.name = name + + +class _Obj: + def __init__(self, name, type_name, fields=None): + self.name = name + self.n_typ = _Type(type_name) + self._fields = fields or {} + + def to_python_dict(self): + return self._fields + + +class _FakeRenderer: + """Minimal stand-in for ``TRLCRST`` exercising the assembler layout.""" + + def __init__(self, objs): + self._objs = objs + + def objects_by_fqn(self): + return self._objs + + def render_table_to_string( + self, columns, fqns=None, name_header="Name", link_fn=None + ): + if fqns is None: + fqns = list(self._objs) + if link_fn is not None: + rows = "\n".join(link_fn(f, self._objs[f].name) for f in fqns) + else: + rows = "\n".join(self._objs[f].name for f in fqns) + return f"TABLE[{name_header}]\n{rows}\n" + + def render_records_to_string(self, fqns, fields): + return "RECORDS(" + ",".join(fqns) + ")\n" + + def field_value_for(self, fqn, field_name, records=None): + return self._objs[fqn].to_python_dict().get(field_name, "") + + +def _objs(): + return { + "Lib.FM_A": _Obj( + "FM_A", + "FailureMode", + { + "guideword": "LossOfFunction", + "safety": "B", + "interface": "Lib.Api", + "failureeffect": "world ends", + "description": "fm a description", + }, + ), + "Lib.FM_Orphan": _Obj( + "FM_Orphan", "FailureMode", {"safety": "QM", "guideword": "TooLate"} + ), + "Lib.CM_1": _Obj( + "CM_1", "ControlMeasure", {"safety": "B", "description": "cm one"} + ), + "Lib.CM_Orphan": _Obj("CM_Orphan", "ControlMeasure", {"safety": "D"}), + } + + +class AnchorTest(unittest.TestCase): + def test_anchor_is_sanitised_lowercase(self): + self.assertEqual(fa._anchor("Lib.FM_A"), "fmea-lib-fm-a") + + def test_ref_targets_anchor(self): + self.assertEqual( + fa._ref("Lib.FM_A", "FM_A"), + ":ref:`FM_A `", + ) + + +class BuildBodyTest(unittest.TestCase): + def setUp(self): + self.renderer = _FakeRenderer(_objs()) + self.chains = [ + { + "fm_fqn": "Lib.FM_A", + "fm_name": "Failure Mode A", + "puml": "fta_a.puml", + "control_measures": ["Lib.CM_1"], + } + ] + + def test_overview_and_chain_section_rendered(self): + body = fa._build_body(self.renderer, self.chains, "Title") + self.assertIn("Title\n=====", body) + self.assertIn("Overview", body) + # Top-level grouping sections. + self.assertIn("Failure Modes\n-------------", body) + # FM dropdown titled by its full fqn only (no ASIL in the heading). + self.assertIn(".. dropdown:: Lib.FM_A\n", body) + self.assertNotIn(".. dropdown:: Lib.FM_A :bdg", body) + self.assertIn(":name: fmea-lib-fm-a", body) + # Attributes as a grid of cards; no inner requirement id. + self.assertNotIn(".. requirement:definition::", body) + self.assertIn(".. grid:: 2", body) + # Guideword / ASIL are bare centred grid items (no card chrome). + self.assertIn(".. grid-item::", body) + self.assertIn(":class: sd-text-center", body) + self.assertIn(":bdg-info:`LossOfFunction`", body) + self.assertIn(":bdg-warning:`ASIL B`", body) + self.assertNotIn(".. grid-item-card:: Guideword", body) + self.assertIn(".. grid-item-card:: Interface", body) + # Description as a prominent card. + self.assertIn(".. grid-item-card:: Description", body) + self.assertIn("fm a description", body) + # Root Cause Analysis rubric over the inline FTA + per-FM CM cards. + self.assertIn(".. rubric:: Root Cause Analysis", body) + self.assertIn(".. uml:: fta_a.puml", body) + self.assertIn(".. rubric:: Control Measures", body) + # CM card: bold ID with inline ASIL badge in the card body (no card title). + self.assertIn(".. grid-item-card::\n", body) + self.assertIn("**CM_1**", body) + self.assertIn(":bdg-warning:`ASIL B`", body) + # Attribute grid uses a gutter to separate the badge row from the cards. + self.assertIn(":gutter: 3", body) + + def test_global_control_measures_table_lists_all(self): + body = fa._build_body(self.renderer, self.chains, "Title") + self.assertIn("Control Measures\n----------------", body) + self.assertIn("TABLE[Control Measure]", body) + self.assertIn("CM_1", body) + self.assertIn("CM_Orphan", body) + + def test_orphan_failure_mode_rendered_without_fta(self): + body = fa._build_body(self.renderer, self.chains, "Title") + # Orphan FM still appears as a dropdown, but with no FTA / RCA rubric. + self.assertIn(".. dropdown:: Lib.FM_Orphan\n", body) + self.assertEqual(body.count(".. rubric:: Root Cause Analysis"), 1) + + def test_no_chains_renders_all_failure_modes(self): + body = fa._build_body(self.renderer, [], "Title") + self.assertIn(".. dropdown:: Lib.FM_A\n", body) + self.assertIn(".. dropdown:: Lib.FM_Orphan\n", body) + self.assertNotIn(".. uml::", body) + + def test_chain_with_unknown_fm_fqn_logs_warning(self): + chains = [ + { + "fm_fqn": "Lib.NoSuchFM", + "fm_name": "Unknown", + "puml": "x.puml", + "control_measures": [], + } + ] + import logging as _logging + + with self.assertLogs("fmea_assembler", level=_logging.WARNING) as log: + body = fa._build_body(self.renderer, chains, "Title") + self.assertTrue(any("Lib.NoSuchFM" in m for m in log.output)) + # The unknown FM is skipped; the known FMs still render. + self.assertIn(".. dropdown:: Lib.FM_A\n", body) + self.assertNotIn(".. dropdown:: Lib.NoSuchFM\n", body) + + +class ChainValidationTest(unittest.TestCase): + """A malformed chain entry must fail loudly, not crash with a bare KeyError.""" + + def setUp(self): + self.renderer = _FakeRenderer(_objs()) + + def test_chain_missing_fm_fqn_raises_valueerror(self): + chains = [{"puml": "a.puml", "control_measures": []}] + with self.assertRaises(ValueError) as ctx: + fa._build_body(self.renderer, chains, "Title") + self.assertIn("missing required key", str(ctx.exception)) + + def test_chain_missing_puml_raises_valueerror(self): + chains = [{"fm_fqn": "Lib.FM_A", "control_measures": []}] + with self.assertRaises(ValueError): + fa._build_body(self.renderer, chains, "Title") + + +# Minimal self-contained TRLC model: defines the FailureMode / ControlMeasure +# types the assembler keys on, so main() runs a real TRLCRST parse (catching +# contract drift the _FakeRenderer cannot). +_RSL = """\ +package TestFmea + +type FailureMode { + guideword optional String + safety optional String + interface optional String + failureeffect optional String + description optional String +} + +type ControlMeasure { + safety optional String + description optional String +} +""" + +_FM_TRLC = """\ +package TestFmea + +FailureMode FmA { + guideword = "TooLate" + safety = "ASIL_D" + interface = "Lib.Api" + failureeffect = "downstream timeout" + description = "fm a description" +} +""" + +_CM_TRLC = """\ +package TestFmea + +ControlMeasure CmA { + safety = "ASIL_D" + description = "cm a description" +} +""" + + +class MainIntegrationTest(unittest.TestCase): + """End-to-end main(): real TRLCRST parse + chains JSON -> fmea.rst.""" + + def _write(self, directory, name, content): + path = os.path.join(directory, name) + with open(path, "w", encoding="utf-8") as fh: + fh.write(content) + return path + + def _run_main(self, argv): + saved = sys.argv + try: + sys.argv = argv + fa.main() + finally: + sys.argv = saved + + def test_full_page_assembled_from_real_trlc(self): + with tempfile.TemporaryDirectory() as tmp: + rsl = self._write(tmp, "types.rsl", _RSL) + fm = self._write(tmp, "fm.trlc", _FM_TRLC) + cm = self._write(tmp, "cm.trlc", _CM_TRLC) + template = self._write(tmp, "tmpl.rst", "{body}\n") + chains = self._write( + tmp, + "chains.json", + json.dumps( + [ + { + "fm_fqn": "TestFmea.FmA", + "fm_name": "Fm A", + "puml": "a.puml", + "control_measures": ["TestFmea.CmA"], + } + ] + ), + ) + out = os.path.join(tmp, "fmea.rst") + + self._run_main( + [ + "fmea_assembler", + "--output", + out, + "--template", + template, + "--title", + "Test FMEA", + "--chains", + chains, + "--failuremodes", + fm, + "--controlmeasures", + cm, + "--spec", + rsl, + ] + ) + + with open(out, encoding="utf-8") as fh: + rst = fh.read() + + # Title + overview table + FM dropdown + inline FTA + control measures. + self.assertIn("Test FMEA", rst) + self.assertIn("Overview", rst) + self.assertIn(".. list-table::", rst) + self.assertIn(".. dropdown:: TestFmea.FmA\n", rst) + self.assertIn(":name: fmea-testfmea-fma", rst) + self.assertIn(".. grid-item-card:: Description", rst) + self.assertIn(".. rubric:: Root Cause Analysis", rst) + self.assertIn(".. uml:: a.puml", rst) + self.assertIn("Control Measures", rst) + # Real rendered record content (proves TRLCRST actually parsed). + self.assertIn("fm a description", rst) + self.assertIn("cm a description", rst) + + def test_malformed_chains_json_exits_nonzero(self): + with tempfile.TemporaryDirectory() as tmp: + template = self._write(tmp, "tmpl.rst", "{body}\n") + bad = self._write(tmp, "chains.json", "{ this is not json") + out = os.path.join(tmp, "fmea.rst") + with self.assertRaises(SystemExit) as ctx: + self._run_main( + [ + "fmea_assembler", + "--output", + out, + "--template", + template, + "--title", + "T", + "--chains", + bad, + ] + ) + self.assertEqual(ctx.exception.code, 1) + + def test_missing_body_placeholder_exits_nonzero(self): + with tempfile.TemporaryDirectory() as tmp: + # Template without the required {body} placeholder. + template = self._write(tmp, "tmpl.rst", "no placeholder here\n") + chains = self._write(tmp, "chains.json", "[]") + out = os.path.join(tmp, "fmea.rst") + with self.assertRaises(SystemExit) as ctx: + self._run_main( + [ + "fmea_assembler", + "--output", + out, + "--template", + template, + "--title", + "T", + "--chains", + chains, + ] + ) + self.assertEqual(ctx.exception.code, 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/bazel/rules/rules_score/test/test_safety_analysis_tools.py b/bazel/rules/rules_score/test/test_safety_analysis_tools.py deleted file mode 100644 index c65a1a15..00000000 --- a/bazel/rules/rules_score/test/test_safety_analysis_tools.py +++ /dev/null @@ -1,363 +0,0 @@ -# ******************************************************************************* -# Copyright (c) 2026 Contributors to the Eclipse Foundation -# -# See the NOTICE file(s) distributed with this work for additional -# information regarding copyright ownership. -# -# This program and the accompanying materials are made available under the -# terms of the Apache License Version 2.0 which is available at -# https://www.apache.org/licenses/LICENSE-2.0 -# -# SPDX-License-Identifier: Apache-2.0 -# ******************************************************************************* - -"""Tests for safety_analysis_tools.""" - -import json -import logging -import os -import sys -import tempfile -import unittest - -from safety_analysis_tools import ( - LOBSTER_GENERATOR, - LOBSTER_SCHEMA, - LOBSTER_VERSION, - _is_valid_trlc_fqn, - _parse_quoted_args, - create_lobster_output, - extract_fta_items, - main, - preprocess_puml, -) - -# --------------------------------------------------------------------------- -# Fixtures -# --------------------------------------------------------------------------- - -SAMPLE_FTA = """\ -@startuml - -!include fta_metamodel.puml - -$TopEvent("Failure takes over", "Pkg.TopFailure") -$OrGate("OG1", "Pkg.TopFailure") -$BasicEvent("Bad luck", "Pkg.BadLuck", "OG1") -$IntermediateEvent("Angry", "IEF", "OG1") -$AndGate("AG2", "IEF") -$BasicEvent("No Cookies", "Pkg.NoCookies", "AG2") -$BasicEvent("No Coffee", "Pkg.NoCoffee", "AG2") - -@enduml -""" - -EMPTY_FTA = """\ -@startuml -' No events here -@enduml -""" - -SAMPLE_METAMODEL = """\ -@startuml - -' AND gate sprite -sprite $and placeholder - -!procedure $TopEvent($name, $alias) - rectangle "$name" as $alias -!endprocedure - -!procedure $BasicEvent($name, $alias, $connection) - "$name" as $alias - $alias -u-> $connection -!endprocedure - -@enduml -""" - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - - -def _write_file(content: str, directory: str, name: str) -> str: - path = os.path.join(directory, name) - with open(path, "w", encoding="utf-8") as fh: - fh.write(content) - return path - - -# --------------------------------------------------------------------------- -# _parse_quoted_args -# --------------------------------------------------------------------------- - - -class TestParseQuotedArgs(unittest.TestCase): - def test_returns_none_when_proc_absent(self): - self.assertIsNone(_parse_quoted_args("$OrGate()", "$TopEvent")) - - def test_extracts_two_args(self): - self.assertEqual( - _parse_quoted_args('$TopEvent("My name", "Pkg.Record")', "$TopEvent"), - ["My name", "Pkg.Record"], - ) - - def test_extracts_three_args(self): - self.assertEqual( - _parse_quoted_args( - '$BasicEvent("Bad luck", "Pkg.BadLuck", "OG1")', "$BasicEvent" - ), - ["Bad luck", "Pkg.BadLuck", "OG1"], - ) - - def test_returns_none_on_missing_open_paren(self): - self.assertIsNone(_parse_quoted_args("$TopEvent", "$TopEvent")) - - def test_returns_none_on_no_quoted_args(self): - self.assertIsNone(_parse_quoted_args("$TopEvent()", "$TopEvent")) - - def test_ignores_content_after_close_paren(self): - line = '$TopEvent("a", "b") $BasicEvent("c", "d", "e")' - self.assertEqual(_parse_quoted_args(line, "$TopEvent"), ["a", "b"]) - - -# --------------------------------------------------------------------------- -# _is_valid_trlc_fqn -# --------------------------------------------------------------------------- - - -class TestIsValidTrlcFqn(unittest.TestCase): - def test_valid(self): - self.assertTrue(_is_valid_trlc_fqn("Pkg.Record")) - self.assertTrue(_is_valid_trlc_fqn("SampleLib.FailureMode")) - - def test_no_dot(self): - self.assertFalse(_is_valid_trlc_fqn("NoDotsHere")) - - def test_too_many_dots(self): - self.assertFalse(_is_valid_trlc_fqn("A.B.C")) - - def test_empty_string(self): - self.assertFalse(_is_valid_trlc_fqn("")) - - def test_dot_only(self): - self.assertFalse(_is_valid_trlc_fqn(".")) - - -# --------------------------------------------------------------------------- -# extract_fta_items -# --------------------------------------------------------------------------- - - -class TestExtractFtaItems(unittest.TestCase): - def test_extracts_top_event(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - items = extract_fta_items(path) - - top_events = [i for i in items if i["kind"] == "TopEvent"] - self.assertEqual(len(top_events), 1) - self.assertEqual(top_events[0]["name"], "Pkg.TopFailure") - self.assertEqual(top_events[0]["tag"], "fta Pkg.TopFailure") - self.assertIn("req Pkg.TopFailure", top_events[0]["refs"]) - - def test_extracts_basic_events(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - items = extract_fta_items(path) - - basic_aliases = {i["name"] for i in items if i["kind"] == "BasicEvent"} - self.assertEqual( - basic_aliases, {"Pkg.BadLuck", "Pkg.NoCookies", "Pkg.NoCoffee"} - ) - - def test_ignores_gates_and_intermediate_events(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - items = extract_fta_items(path) - self.assertEqual({i["kind"] for i in items}, {"TopEvent", "BasicEvent"}) - - def test_refs_point_to_alias(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - for item in extract_fta_items(path): - self.assertIn(f"req {item['name']}", item["refs"]) - - def test_location_contains_file_and_line(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - for item in extract_fta_items(path): - self.assertEqual(item["location"]["kind"], "file") - self.assertEqual(item["location"]["file"], path) - self.assertGreater(item["location"]["line"], 0) - - def test_one_match_per_line(self): - content = '$TopEvent("name", "A.B") $BasicEvent("x", "C.D", "conn")\n' - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(content, tmp, "fta.puml") - items = extract_fta_items(path) - self.assertEqual(len(items), 1) - self.assertEqual(items[0]["kind"], "TopEvent") - - def test_empty_diagram_returns_empty_list(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(EMPTY_FTA, tmp, "fta.puml") - self.assertEqual(extract_fta_items(path), []) - - def test_missing_file_raises(self): - with tempfile.TemporaryDirectory() as tmp: - with self.assertRaises(OSError): - extract_fta_items(os.path.join(tmp, "nonexistent.puml")) - - def test_invalid_alias_logs_warning(self): - content = '$TopEvent("bad alias", "NoDotsHere")\n' - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(content, tmp, "fta.puml") - with self.assertLogs(level="WARNING") as log: - extract_fta_items(path) - self.assertTrue(any("does not look like a valid" in m for m in log.output)) - - def test_valid_alias_produces_no_fqn_warning(self): - with tempfile.TemporaryDirectory() as tmp: - path = _write_file(SAMPLE_FTA, tmp, "fta.puml") - with self.assertLogs(level="WARNING") as log: - logging.getLogger().warning( - "sentinel" - ) # ensure assertLogs doesn't fail - extract_fta_items(path) - self.assertFalse(any("does not look like a valid" in m for m in log.output)) - - -# --------------------------------------------------------------------------- -# preprocess_puml -# --------------------------------------------------------------------------- - - -class TestPreprocessPuml(unittest.TestCase): - def _write_metamodel(self, directory: str) -> str: - return _write_file(SAMPLE_METAMODEL, directory, "fta_metamodel.puml") - - def test_inlines_metamodel_content(self): - with tempfile.TemporaryDirectory() as tmp: - meta = self._write_metamodel(tmp) - src = _write_file( - '@startuml\n!include fta_metamodel.puml\n$TopEvent("x", "A.B")\n@enduml\n', - tmp, - "fta.puml", - ) - out = os.path.join(tmp, "out.puml") - preprocess_puml(src, meta, out) - - with open(out) as fh: - content = fh.read() - self.assertNotIn("!include fta_metamodel.puml", content) - self.assertIn("!procedure $TopEvent", content) - self.assertIn("sprite $and", content) - self.assertIn('$TopEvent("x", "A.B")', content) - - def test_metamodel_markers_are_stripped(self): - with tempfile.TemporaryDirectory() as tmp: - meta = self._write_metamodel(tmp) - src = _write_file( - "@startuml\n!include fta_metamodel.puml\n@enduml\n", - tmp, - "fta.puml", - ) - out = os.path.join(tmp, "out.puml") - preprocess_puml(src, meta, out) - with open(out) as fh: - self.assertNotIn("@startuml\n@startuml", fh.read()) - - def test_diagram_without_include_is_unchanged(self): - original = '@startuml\n$TopEvent("x", "A.B")\n@enduml\n' - with tempfile.TemporaryDirectory() as tmp: - meta = self._write_metamodel(tmp) - src = _write_file(original, tmp, "fta.puml") - out = os.path.join(tmp, "out.puml") - preprocess_puml(src, meta, out) - with open(out) as fh: - self.assertEqual(fh.read(), original) - - -# --------------------------------------------------------------------------- -# create_lobster_output -# --------------------------------------------------------------------------- - - -class TestCreateLobsterOutput(unittest.TestCase): - def test_envelope_fields(self): - items = [{"tag": "fta A.B", "name": "A.B"}] - output = create_lobster_output(items) - self.assertEqual(output["generator"], LOBSTER_GENERATOR) - self.assertEqual(output["schema"], LOBSTER_SCHEMA) - self.assertEqual(output["version"], LOBSTER_VERSION) - self.assertEqual(output["data"], items) - - def test_empty_items(self): - self.assertEqual(create_lobster_output([])["data"], []) - - -# --------------------------------------------------------------------------- -# main (CLI integration) -# --------------------------------------------------------------------------- - - -class TestMain(unittest.TestCase): - """Integration tests for the flat CLI: --metamodel --output-dir --lobster inputs...""" - - def _run_main(self, tmp: str, *puml_paths: str) -> dict: - """Invoke main() and return the parsed lobster JSON dict.""" - meta = _write_file(SAMPLE_METAMODEL, tmp, "fta_metamodel.puml") - out_dir = os.path.join(tmp, "preprocessed") - lobster_path = os.path.join(tmp, "out.lobster") - - saved = sys.argv - try: - sys.argv = [ - "safety_analysis_tools", - "--metamodel", - meta, - "--output-dir", - out_dir, - "--lobster", - lobster_path, - *puml_paths, - ] - main() - finally: - sys.argv = saved - - with open(lobster_path, encoding="utf-8") as fh: - return json.load(fh) - - def test_produces_valid_lobster_file(self): - with tempfile.TemporaryDirectory() as tmp: - puml = _write_file(SAMPLE_FTA, tmp, "fta.puml") - data = self._run_main(tmp, puml) - self.assertEqual(data["schema"], LOBSTER_SCHEMA) - self.assertEqual(len(data["data"]), 4) # 1 TopEvent + 3 BasicEvents - - def test_produces_preprocessed_puml_files(self): - with tempfile.TemporaryDirectory() as tmp: - puml = _write_file(SAMPLE_FTA, tmp, "fta.puml") - self._run_main(tmp, puml) - - preprocessed = os.path.join(tmp, "preprocessed", "fta.puml") - self.assertTrue(os.path.exists(preprocessed)) - with open(preprocessed) as fh: - content = fh.read() - self.assertNotIn("!include fta_metamodel.puml", content) - self.assertIn("!procedure $TopEvent", content) - - def test_multiple_inputs_aggregate_lobster_items(self): - with tempfile.TemporaryDirectory() as tmp: - p1 = _write_file(SAMPLE_FTA, tmp, "a.puml") - p2 = _write_file(SAMPLE_FTA, tmp, "b.puml") - data = self._run_main(tmp, p1, p2) - self.assertEqual(len(data["data"]), 8) # 4 items × 2 files - - -if __name__ == "__main__": - unittest.main() diff --git a/plantuml/parser/puml_cli/BUILD b/plantuml/parser/puml_cli/BUILD index 45169359..ea703163 100644 --- a/plantuml/parser/puml_cli/BUILD +++ b/plantuml/parser/puml_cli/BUILD @@ -18,6 +18,7 @@ rust_binary( crate_root = "src/main.rs", visibility = ["//visibility:public"], deps = [ + "//plantuml/parser/puml_fta", "//plantuml/parser/puml_lobster", "//plantuml/parser/puml_parser", "//plantuml/parser/puml_resolver", @@ -34,6 +35,7 @@ rust_binary( "@crates//:env_logger", "@crates//:log", "@crates//:serde", + "@crates//:serde_json", ], ) @@ -42,6 +44,7 @@ rust_test( srcs = ["src/main.rs"], crate_root = "src/main.rs", deps = [ + "//plantuml/parser/puml_fta", "//plantuml/parser/puml_lobster", "//plantuml/parser/puml_parser", "//plantuml/parser/puml_resolver", @@ -58,5 +61,6 @@ rust_test( "@crates//:env_logger", "@crates//:log", "@crates//:serde", + "@crates//:serde_json", ], ) diff --git a/plantuml/parser/puml_cli/src/main.rs b/plantuml/parser/puml_cli/src/main.rs index 9562ee37..b797166c 100644 --- a/plantuml/parser/puml_cli/src/main.rs +++ b/plantuml/parser/puml_cli/src/main.rs @@ -26,10 +26,11 @@ use class_serializer::ClassSerializer; use component_serializer::ComponentSerializer; use sequence_serializer::SequenceSerializer; +use puml_fta::{lobster_document, FtaChain, FtaModel}; use puml_lobster::{write_lobster_to_file, LobsterModel}; use puml_parser::{ - DiagramParser, ErrorLocation, Preprocessor, PumlActivityParser, PumlClassParser, - PumlComponentParser, PumlSequenceParser, + DiagramParser, ErrorLocation, Preprocessor, ProcedureParserService, PumlActivityParser, + PumlClassParser, PumlComponentParser, PumlSequenceParser, }; use puml_resolver::{ ActivityResolver, ClassResolver, ComponentResolver, DiagramResolver, SequenceResolver, @@ -98,6 +99,16 @@ struct Args { /// build output set is always complete. #[arg(long)] lobster_output_dir: Option, + + /// Output directory for Fault-Tree-Analysis artifacts (optional). + /// When set, every input diagram is treated as an FTA: its + /// ``fta_metamodel.puml`` include is inlined and the metamodel-inlined + /// ``.puml`` is written to this directory, alongside an aggregated + /// ``root_causes.lobster`` (lobster-act-trace) and ``fta_chains.json`` + /// describing the per-failure-mode chains. No FlatBuffers / component + /// processing is performed in this mode. + #[arg(long)] + fta_output_dir: Option, } #[derive(Copy, Clone, ValueEnum, Debug)] @@ -132,6 +143,20 @@ fn run() -> Result<(), Box> { Builder::new() .filter_level(log_level.to_level_filter()) .init(); + if let Some(dir) = &args.fta_output_dir { + // FTA mode is a self-contained pipeline that short-circuits the normal + // FlatBuffers/lobster passes; reject co-specified output modes rather + // than silently ignoring them. + if args.fbs_output_dir.is_some() || args.lobster_output_dir.is_some() { + return Err( + "--fta-output-dir cannot be combined with --fbs-output-dir or \ + --lobster-output-dir" + .into(), + ); + } + return run_fta(&args, dir, log_level); + } + let emit_debug_json = log_level.to_level_filter() >= log::LevelFilter::Debug; let fbs_output_dir: Option = if let Some(dir) = &args.fbs_output_dir { @@ -213,6 +238,78 @@ fn run() -> Result<(), Box> { Ok(()) } +/// FTA processing pipeline: inline the metamodel, parse the fault-tree macro +/// calls, and emit the metamodel-inlined `.puml`, an aggregated +/// `root_causes.lobster`, and `fta_chains.json`. +fn run_fta( + args: &Args, + output_dir: &str, + log_level: LogLevel, +) -> Result<(), Box> { + let out = PathBuf::from(output_dir); + fs::create_dir_all(&out)?; + + let inputs = collect_files_from_args(args)?; + if inputs.is_empty() { + return Err("No valid PUML files found.".into()); + } + + // Process inputs in a deterministic order for reproducible output. + let mut sorted: Vec> = inputs.into_iter().collect(); + sorted.sort(); + + let mut all_items: Vec = Vec::new(); + let mut all_chains: Vec = Vec::new(); + // Chains/lobster items reference each diagram by basename; two inputs sharing + // a basename (in different directories) would be indistinguishable downstream. + let mut seen_basenames: HashSet = HashSet::new(); + + for file in &sorted { + let basename = file.file_name().and_then(|n| n.to_str()).ok_or_else(|| { + format!( + "input path has no valid UTF-8 file name: {}", + file.display() + ) + })?; + // The metamodel carries only `!procedure` definitions, no fault tree. + if basename == "fta_metamodel.puml" { + continue; + } + if !seen_basenames.insert(basename.to_string()) { + return Err(format!( + "duplicate FTA diagram basename {:?}: inputs in different directories \ + would be indistinguishable in {}", + basename, output_dir, + ) + .into()); + } + + // Analysis only: the fault-tree topology comes entirely from the + // `$TopEvent(...)` / `$BasicEvent(...)` / gate macro *calls*, which the + // procedure parser reads straight from the source (the `!include + // fta_metamodel.puml` line is inert text). The diagram is rendered + // as-authored by Sphinx/PlantUML, which finds the metamodel via the + // toolchain's global `plantuml.include.path`. + let source = fs::read_to_string(file.as_path())?; + let parsed = ProcedureParserService.parse_file(file, &source, log_level)?; + let model = FtaModel::from_procedure_file(&parsed)?; + all_items.extend(model.lobster_items(basename)); + all_chains.extend(model.chains(basename)); + debug!("Processed FTA diagram: {}", file.display()); + } + + let lobster = lobster_document(all_items); + fs::write( + out.join("root_causes.lobster"), + serde_json::to_string_pretty(&lobster)? + "\n", + )?; + fs::write( + out.join("fta_chains.json"), + serde_json::to_string_pretty(&all_chains)? + "\n", + )?; + Ok(()) +} + fn serialize_resolved_diagram(resolved_content: &ResolvedDiagram, source_file: &str) -> Vec { match resolved_content { ResolvedDiagram::Activity(resolved_content) => { @@ -528,3 +625,148 @@ B --> A : reply ); } } + +#[cfg(test)] +mod fta_pipeline_tests { + use super::*; + use clap::Parser; + use puml_utils::LogLevel; + + // Minimal metamodel: just enough procedure definitions for the include + // expander to inline. Topology is read from the original macro calls, so + // the bodies are irrelevant. + const TEST_METAMODEL: &str = "@startuml\n\ + !procedure $TopEvent($name, $alias)\n\ + rectangle \"$name\" as $alias\n\ + !endprocedure\n\ + !procedure $OrGate($alias, $connection)\n\ + rectangle \" \" as $alias\n\ + !endprocedure\n\ + !procedure $BasicEvent($name, $alias, $connection)\n\ + usecase \"$name\" as $alias\n\ + !endprocedure\n\ + @enduml\n"; + + fn unique_dir(tag: &str) -> PathBuf { + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let dir = + std::env::temp_dir().join(format!("puml_fta_{}_{}_{}", tag, std::process::id(), nanos)); + fs::create_dir_all(&dir).unwrap(); + dir + } + + fn write(path: &Path, content: &str) { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).unwrap(); + } + fs::write(path, content).unwrap(); + } + + fn diagram(fm_name: &str, fm_fqn: &str, cm_fqn: &str) -> String { + format!( + "@startuml\n!include fta_metamodel.puml\n\ + $TopEvent(\"{fm_name}\", \"{fm_fqn}\")\n\ + $OrGate(\"OG\", \"{fm_fqn}\")\n\ + $BasicEvent(\"a control measure\", \"{cm_fqn}\", \"OG\")\n@enduml\n", + ) + } + + fn args_for(out: &Path, files: &[&Path]) -> Args { + let mut argv: Vec = vec!["puml_cli".to_string()]; + for f in files { + argv.push("--file".to_string()); + argv.push(f.to_str().unwrap().to_string()); + } + argv.push("--fta-output-dir".to_string()); + argv.push(out.to_str().unwrap().to_string()); + Args::parse_from(argv) + } + + #[test] + fn run_fta_emits_lobster_and_chains_without_rewriting_diagram() { + let dir = unique_dir("emit"); + write(&dir.join("fta_metamodel.puml"), TEST_METAMODEL); + let a = dir.join("a.puml"); + write(&a, &diagram("FM A", "Lib.FmA", "Lib.CmA")); + let out = dir.join("out"); + + let args = args_for(&out, &[&a]); + run_fta(&args, out.to_str().unwrap(), LogLevel::Warn).expect("run_fta"); + + // FTA mode is analysis-only: it does not rewrite or emit the diagram + // (Sphinx/PlantUML renders the authored .puml, resolving the metamodel + // via the global include path). + assert!(!out.join("a.puml").exists()); + + // Lobster: act-trace envelope with a top + basic event. + let lobster: serde_json::Value = + serde_json::from_str(&fs::read_to_string(out.join("root_causes.lobster")).unwrap()) + .unwrap(); + assert_eq!(lobster["schema"], "lobster-act-trace"); + let data = lobster["data"].as_array().unwrap(); + assert_eq!(data.len(), 2); + + // Chains: one failure-mode chain carrying its control measure. + let chains: serde_json::Value = + serde_json::from_str(&fs::read_to_string(out.join("fta_chains.json")).unwrap()) + .unwrap(); + let chains = chains.as_array().unwrap(); + assert_eq!(chains.len(), 1); + assert_eq!(chains[0]["fm_fqn"], "Lib.FmA"); + assert_eq!(chains[0]["control_measures"][0], "Lib.CmA"); + + fs::remove_dir_all(&dir).ok(); + } + + #[test] + fn run_fta_aggregates_multiple_diagrams() { + let dir = unique_dir("multi"); + write(&dir.join("fta_metamodel.puml"), TEST_METAMODEL); + let a = dir.join("a.puml"); + let b = dir.join("b.puml"); + write(&a, &diagram("FM A", "Lib.FmA", "Lib.CmA")); + write(&b, &diagram("FM B", "Lib.FmB", "Lib.CmB")); + let out = dir.join("out"); + + let args = args_for(&out, &[&a, &b]); + run_fta(&args, out.to_str().unwrap(), LogLevel::Warn).expect("run_fta"); + + let chains: serde_json::Value = + serde_json::from_str(&fs::read_to_string(out.join("fta_chains.json")).unwrap()) + .unwrap(); + let fqns: Vec<&str> = chains + .as_array() + .unwrap() + .iter() + .map(|c| c["fm_fqn"].as_str().unwrap()) + .collect(); + assert!(fqns.contains(&"Lib.FmA")); + assert!(fqns.contains(&"Lib.FmB")); + + fs::remove_dir_all(&dir).ok(); + } + + #[test] + fn run_fta_rejects_basename_collision() { + let dir = unique_dir("collision"); + // Same basename "fta.puml" under two sub-directories. + let d1 = dir.join("d1"); + let d2 = dir.join("d2"); + write(&d1.join("fta_metamodel.puml"), TEST_METAMODEL); + write(&d2.join("fta_metamodel.puml"), TEST_METAMODEL); + let f1 = d1.join("fta.puml"); + let f2 = d2.join("fta.puml"); + write(&f1, &diagram("FM A", "Lib.FmA", "Lib.CmA")); + write(&f2, &diagram("FM B", "Lib.FmB", "Lib.CmB")); + let out = dir.join("out"); + + let args = args_for(&out, &[&f1, &f2]); + let err = run_fta(&args, out.to_str().unwrap(), LogLevel::Warn).unwrap_err(); + assert!(err.to_string().contains("duplicate FTA diagram basename")); + + fs::remove_dir_all(&dir).ok(); + } +} diff --git a/plantuml/parser/puml_fta/BUILD b/plantuml/parser/puml_fta/BUILD new file mode 100644 index 00000000..cbd5b6c0 --- /dev/null +++ b/plantuml/parser/puml_fta/BUILD @@ -0,0 +1,42 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") + +rust_library( + name = "puml_fta", + srcs = ["src/lib.rs"], + visibility = ["//plantuml/parser:__subpackages__"], + deps = [ + "//plantuml/parser/puml_parser:parser_core", + "//plantuml/parser/puml_parser/src/preprocessor/src/procedure:procedure_preprocessor", + "//plantuml/parser/puml_utils", + "@crates//:log", + "@crates//:serde", + "@crates//:serde_json", + "@crates//:thiserror", + ], +) + +rust_test( + name = "puml_fta_test", + crate = ":puml_fta", + deps = [ + "//plantuml/parser/puml_parser:parser_core", + "//plantuml/parser/puml_parser/src/preprocessor/src/procedure:procedure_preprocessor", + "//plantuml/parser/puml_utils", + "@crates//:log", + "@crates//:serde", + "@crates//:serde_json", + "@crates//:thiserror", + ], +) diff --git a/plantuml/parser/puml_fta/src/lib.rs b/plantuml/parser/puml_fta/src/lib.rs new file mode 100644 index 00000000..bcca3516 --- /dev/null +++ b/plantuml/parser/puml_fta/src/lib.rs @@ -0,0 +1,551 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Fault-Tree-Analysis (FTA) model and emitters. +//! +//! Consumes the procedure parser's [`ProcedureFile`] (the stream of +//! `$TopEvent(...)` / `$BasicEvent(...)` / gate macro calls produced after +//! `fta_metamodel.puml` has been inlined) and turns it into: +//! +//! * a [`lobster-act-trace`] JSON document (`root_causes.lobster`) — schema +//! compatible with the legacy `safety_analysis_tools.py` so the +//! `dependability_analysis` traceability test is unaffected, and +//! * an ordered list of *chains* (`fta_chains.json`) describing, per failure +//! mode, the inline diagram and the control measures (basic events) that +//! trace up to it — consumed by the FMEA page assembler. +//! +//! [`lobster-act-trace`]: https://github.com/bmw-software-engineering/lobster + +use std::collections::HashMap; + +use log::warn; +use procedure_preprocessor::{Arg, MacroCallDef, ProcedureFile, Statement}; +use serde::Serialize; +use serde_json::{json, Value}; + +/// Procedure macro names recognised in an FTA diagram. +const TOP_EVENT: &str = "$TopEvent"; +const INTERMEDIATE_EVENT: &str = "$IntermediateEvent"; +const BASIC_EVENT: &str = "$BasicEvent"; +const AND_GATE: &str = "$AndGate"; +const OR_GATE: &str = "$OrGate"; +const TRANSFER_IN_GATE: &str = "$TransferInGate"; + +/// The kind of a node in a fault tree. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +pub enum NodeKind { + /// `$TopEvent` — the failure mode at the root of the tree. + TopEvent, + /// `$IntermediateEvent` — a named intermediate node. + IntermediateEvent, + /// `$BasicEvent` — a leaf root cause / control measure. + BasicEvent, + /// `$AndGate`, `$OrGate`, `$TransferInGate` — a logic gate. + Gate, +} + +/// One node of a fault tree. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct FtaNode { + pub kind: NodeKind, + /// Human readable display name (events only; gates carry `None`). + pub name: Option, + /// Alias / identifier. For top and basic events this is the TRLC + /// fully-qualified name of the corresponding record. + pub alias: String, + /// Alias of the parent node this node connects upward to. `None` for the + /// top event (the root). + pub connection: Option, + /// 1-based line of the macro call in its source diagram. + /// `None` when the line is unavailable (e.g. synthesised nodes in tests). + pub line: Option, +} + +/// A fully parsed fault tree for a single diagram. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct FtaModel { + pub nodes: Vec, +} + +/// One failure-mode chain: the failure mode together with the control measures +/// (basic events) whose ancestry reaches it, plus the diagram to render. +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct FtaChain { + /// Fully-qualified name of the failure mode (top event alias). + pub fm_fqn: String, + /// Human readable failure mode name. + pub fm_name: String, + /// Basename of the preprocessed `.puml` diagram to render inline. + pub puml: String, + /// Fully-qualified names of the control measures for this chain, in the + /// order they appear in the diagram. + pub control_measures: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum FtaError { + #[error("FTA macro {macro_name} requires at least {expected} argument(s)")] + MissingArgs { macro_name: String, expected: usize }, + #[error("FTA macro {macro_name} expected a string argument at position {index}")] + NonStringArg { macro_name: String, index: usize }, +} + +/// Legacy guardrail (ported from `safety_analysis_tools.py`): a TRLC +/// fully-qualified name looks like `Package.Record` — exactly two dot-separated +/// identifier segments. +fn is_valid_trlc_fqn(alias: &str) -> bool { + let parts: Vec<&str> = alias.split('.').collect(); + if parts.len() != 2 { + return false; + } + parts.iter().all(|part| { + let mut chars = part.chars(); + let first_ok = matches!(chars.next(), Some(c) if c.is_ascii_alphabetic() || c == '_'); + first_ok && chars.all(|c| c.is_ascii_alphanumeric() || c == '_') + }) +} + +fn string_arg(call: &MacroCallDef, index: usize) -> Result { + let arg = call.args.get(index).ok_or_else(|| FtaError::MissingArgs { + macro_name: call.name.clone(), + expected: index + 1, + })?; + match arg { + Arg::String(s) => Ok(s.clone()), + _ => Err(FtaError::NonStringArg { + macro_name: call.name.clone(), + index, + }), + } +} + +impl FtaModel { + /// Build a model from a parsed procedure file, ignoring procedure + /// definitions and plain text — only the macro *calls* describe topology. + pub fn from_procedure_file(file: &ProcedureFile) -> Result { + let mut nodes = Vec::new(); + for stmt in &file.stmts { + let Statement::MacroCall(call) = stmt else { + continue; + }; + let line = call.line; + let node = match call.name.as_str() { + TOP_EVENT => FtaNode { + kind: NodeKind::TopEvent, + name: Some(string_arg(call, 0)?), + alias: string_arg(call, 1)?, + connection: None, + line, + }, + INTERMEDIATE_EVENT => FtaNode { + kind: NodeKind::IntermediateEvent, + name: Some(string_arg(call, 0)?), + alias: string_arg(call, 1)?, + connection: Some(string_arg(call, 2)?), + line, + }, + BASIC_EVENT => FtaNode { + kind: NodeKind::BasicEvent, + name: Some(string_arg(call, 0)?), + alias: string_arg(call, 1)?, + connection: Some(string_arg(call, 2)?), + line, + }, + AND_GATE | OR_GATE | TRANSFER_IN_GATE => FtaNode { + kind: NodeKind::Gate, + name: None, + alias: string_arg(call, 0)?, + connection: Some(string_arg(call, 1)?), + line, + }, + // Unknown / cosmetic macros are not part of the topology. + _ => continue, + }; + // Top and basic events carry TRLC fully-qualified names that must + // resolve to requirements in the traceability chain; warn (rather + // than fail) on a malformed alias so the build still produces output. + if matches!(node.kind, NodeKind::TopEvent | NodeKind::BasicEvent) + && !is_valid_trlc_fqn(&node.alias) + { + warn!( + "FTA {} at line {}: alias {:?} is not a valid TRLC fully-qualified \ + name (expected 'Package.Record')", + call.name, + node.line.unwrap_or(0), + node.alias, + ); + } + nodes.push(node); + } + Ok(Self { nodes }) + } + + fn iter_kind(&self, kind: NodeKind) -> impl Iterator { + self.nodes.iter().filter(move |n| n.kind == kind) + } + + /// Index nodes by alias for O(1) parent lookups during the upward walk. + /// On a duplicate alias the last node wins; a warning is emitted so + /// malformed diagrams with repeated aliases are visible in the build log. + fn alias_index(&self) -> HashMap<&str, &FtaNode> { + let mut map = HashMap::with_capacity(self.nodes.len()); + for node in &self.nodes { + if let Some(prev) = map.insert(node.alias.as_str(), node) { + warn!( + "FTA diagram has duplicate alias {:?} at lines {} and {}; \ + the later definition wins", + node.alias, + prev.line.unwrap_or(0), + node.line.unwrap_or(0), + ); + } + } + map + } + + /// Resolve the top-event (failure-mode) alias a node ultimately connects to + /// by walking the `connection` parent links upward. Returns `None` when the + /// chain does not terminate at a known top event (dangling or cyclic + /// diagram). `by_alias` is the precomputed [`alias_index`], so each step is + /// O(1). + fn root_for(&self, start: &FtaNode, by_alias: &HashMap<&str, &FtaNode>) -> Option { + let mut current = start; + // Bound the walk by node count to defend against cyclic connections. + for _ in 0..=self.nodes.len() { + if current.kind == NodeKind::TopEvent { + return Some(current.alias.clone()); + } + let parent_alias = current.connection.as_deref()?; + current = by_alias.get(parent_alias).copied()?; + } + None + } + + /// Assemble ordered failure-mode chains for `puml_basename`. + /// + /// Basic events whose ancestry does not terminate at a known top event are + /// not silently discarded: each emits a `warn!` naming the alias, diagram + /// and line so a malformed fault tree is visible in the build log rather + /// than quietly losing a control measure from the safety chain. + pub fn chains(&self, puml_basename: &str) -> Vec { + let by_alias = self.alias_index(); + let mut chains: Vec = self + .iter_kind(NodeKind::TopEvent) + .map(|fm| FtaChain { + fm_fqn: fm.alias.clone(), + fm_name: fm.name.clone().unwrap_or_else(|| fm.alias.clone()), + puml: puml_basename.to_string(), + control_measures: Vec::new(), + }) + .collect(); + + for basic in self.iter_kind(NodeKind::BasicEvent) { + match self.root_for(basic, &by_alias) { + Some(root) => { + if let Some(chain) = chains.iter_mut().find(|c| c.fm_fqn == root) { + chain.control_measures.push(basic.alias.clone()); + } + } + None => warn!( + "FTA {}:{}: basic event {:?} does not connect to any top event; \ + it is dropped from every failure-mode chain", + puml_basename, + basic.line.unwrap_or(0), + basic.alias, + ), + } + } + chains + } + + /// Emit `lobster-act-trace` items for the top and basic events, mirroring + /// the legacy `safety_analysis_tools.py` schema (tag `fta `, + /// `refs: [req ]`). + pub fn lobster_items(&self, source_file: &str) -> Vec { + self.nodes + .iter() + .filter(|n| matches!(n.kind, NodeKind::TopEvent | NodeKind::BasicEvent)) + .map(|n| { + let kind = match n.kind { + NodeKind::TopEvent => "TopEvent", + NodeKind::BasicEvent => "BasicEvent", + _ => unreachable!(), + }; + json!({ + "tag": format!("fta {}", n.alias), + "location": { + "kind": "file", + "file": source_file, + "line": n.line.map(|l| json!(l)).unwrap_or(Value::Null), + "column": null, + }, + "name": n.alias, + "messages": [], + "just_up": [], + "just_down": [], + "just_global": [], + "refs": [format!("req {}", n.alias)], + "framework": "PlantUML", + "kind": kind, + }) + }) + .collect() + } +} + +/// Wrap lobster items in the standard `lobster-act-trace` envelope, sorted by +/// tag for deterministic output. +pub fn lobster_document(mut items: Vec) -> Value { + items.sort_by(|a, b| { + a["tag"] + .as_str() + .unwrap_or("") + .cmp(b["tag"].as_str().unwrap_or("")) + }); + json!({ + "data": items, + "generator": "puml_fta", + "schema": "lobster-act-trace", + "version": 3, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use parser_core::DiagramParser; + use procedure_preprocessor::{ + Arg, MacroCallDef, ProcedureFile, ProcedureParserService, Statement, + }; + use puml_utils::LogLevel; + use std::path::PathBuf; + use std::rc::Rc; + + fn mk_call(name: &str, args: &[&str], line: usize) -> Statement { + Statement::MacroCall(MacroCallDef { + name: name.to_string(), + args: args.iter().map(|a| Arg::String(a.to_string())).collect(), + line: Some(line), + }) + } + + fn model_from_stmts(stmts: Vec) -> FtaModel { + FtaModel::from_procedure_file(&ProcedureFile { stmts }).expect("fta model") + } + + const SAMPLE: &str = r#" +!procedure $TopEvent($name, $alias) + rectangle "$name" as $alias +!endprocedure +!procedure $IntermediateEvent($name, $alias, $connection) + rectangle "$name" as $alias +!endprocedure +!procedure $BasicEvent($name, $alias, $connection) + usecase "$name" as $alias +!endprocedure +!procedure $OrGate($alias, $connection) + rectangle " " as $alias +!endprocedure +!procedure $AndGate($alias, $connection) + rectangle " " as $alias +!endprocedure +$TopEvent("SampleFailureMode takes over the world", "SampleLibrary.SampleFailureMode") +$OrGate("OG1", "SampleLibrary.SampleFailureMode") +$IntermediateEvent("SampleFailureMode is Angry", "IEF", "OG1") +$BasicEvent("Just bad luck", "SampleLibrary.JustBadLuck", "OG1") +$AndGate("AG2", "IEF") +$BasicEvent("No More Cookies", "SampleLibrary.NoMoreCookies", "AG2") +$BasicEvent("No More Coffee", "SampleLibrary.NoMoreCoffee", "AG2") +"#; + + fn model_from(content: &str) -> FtaModel { + let path = Rc::new(PathBuf::from("sample_fta.puml")); + let parsed = ProcedureParserService + .parse_file(&path, content, LogLevel::Warn) + .expect("procedure parse"); + FtaModel::from_procedure_file(&parsed).expect("fta model") + } + + #[test] + fn builds_all_topology_nodes() { + let model = model_from(SAMPLE); + assert_eq!(model.iter_kind(NodeKind::TopEvent).count(), 1); + assert_eq!(model.iter_kind(NodeKind::BasicEvent).count(), 3); + assert_eq!(model.iter_kind(NodeKind::Gate).count(), 2); + assert_eq!(model.iter_kind(NodeKind::IntermediateEvent).count(), 1); + } + + #[test] + fn chain_groups_basic_events_under_failure_mode() { + let model = model_from(SAMPLE); + let chains = model.chains("sample_fta.puml"); + assert_eq!(chains.len(), 1); + let chain = &chains[0]; + assert_eq!(chain.fm_fqn, "SampleLibrary.SampleFailureMode"); + assert_eq!(chain.puml, "sample_fta.puml"); + assert_eq!( + chain.control_measures, + vec![ + "SampleLibrary.JustBadLuck", + "SampleLibrary.NoMoreCookies", + "SampleLibrary.NoMoreCoffee", + ] + ); + } + + #[test] + fn lobster_items_match_legacy_schema() { + let model = model_from(SAMPLE); + let doc = lobster_document(model.lobster_items("sample_fta.puml")); + assert_eq!(doc["schema"], "lobster-act-trace"); + assert_eq!(doc["version"], 3); + let data = doc["data"].as_array().unwrap(); + // 1 top event + 3 basic events. + assert_eq!(data.len(), 4); + let top = data + .iter() + .find(|i| i["name"] == "SampleLibrary.SampleFailureMode") + .unwrap(); + assert_eq!(top["tag"], "fta SampleLibrary.SampleFailureMode"); + assert_eq!(top["refs"][0], "req SampleLibrary.SampleFailureMode"); + assert_eq!(top["kind"], "TopEvent"); + assert_eq!(top["framework"], "PlantUML"); + } + + #[test] + fn is_valid_trlc_fqn_matches_package_record() { + assert!(is_valid_trlc_fqn("Pkg.Record")); + assert!(is_valid_trlc_fqn("_Lib.Foo_1")); + assert!(!is_valid_trlc_fqn("NoDot")); + assert!(!is_valid_trlc_fqn("A.B.C")); + assert!(!is_valid_trlc_fqn("")); + assert!(!is_valid_trlc_fqn(".")); + assert!(!is_valid_trlc_fqn("1Bad.Record")); + } + + #[test] + fn multiple_top_events_get_separate_chains() { + let chains = model_from_stmts(vec![ + mk_call(TOP_EVENT, &["FM one", "Lib.FmA"], 1), + mk_call(OR_GATE, &["OGA", "Lib.FmA"], 2), + mk_call(BASIC_EVENT, &["cm a", "Lib.CmA", "OGA"], 3), + mk_call(TOP_EVENT, &["FM two", "Lib.FmB"], 4), + mk_call(OR_GATE, &["OGB", "Lib.FmB"], 5), + mk_call(BASIC_EVENT, &["cm b", "Lib.CmB", "OGB"], 6), + ]) + .chains("d.puml"); + + assert_eq!(chains.len(), 2); + let a = chains.iter().find(|c| c.fm_fqn == "Lib.FmA").unwrap(); + assert_eq!(a.control_measures, vec!["Lib.CmA"]); + let b = chains.iter().find(|c| c.fm_fqn == "Lib.FmB").unwrap(); + assert_eq!(b.control_measures, vec!["Lib.CmB"]); + } + + #[test] + fn dangling_basic_event_is_dropped_from_chains() { + // Basic event connects to a gate alias that does not exist. + let chains = model_from_stmts(vec![ + mk_call(TOP_EVENT, &["FM", "Lib.Fm"], 1), + mk_call(BASIC_EVENT, &["cm", "Lib.Cm", "MissingGate"], 2), + ]) + .chains("d.puml"); + + assert_eq!(chains.len(), 1); + assert!(chains[0].control_measures.is_empty()); + } + + #[test] + fn cyclic_connections_terminate_without_hanging() { + // G1 -> G2 -> G1 cycle, no reachable top event. The bounded walk must + // return without looping forever. + let chains = model_from_stmts(vec![ + mk_call(OR_GATE, &["G1", "G2"], 1), + mk_call(OR_GATE, &["G2", "G1"], 2), + mk_call(BASIC_EVENT, &["cm", "Lib.Cm", "G1"], 3), + ]) + .chains("d.puml"); + + assert!(chains.is_empty()); + } + + #[test] + fn missing_argument_is_an_error() { + let file = ProcedureFile { + stmts: vec![mk_call(TOP_EVENT, &["only name"], 1)], + }; + let err = FtaModel::from_procedure_file(&file).unwrap_err(); + assert!(matches!(err, FtaError::MissingArgs { .. })); + } + + #[test] + fn non_string_argument_is_an_error() { + let file = ProcedureFile { + stmts: vec![Statement::MacroCall(MacroCallDef { + name: TOP_EVENT.to_string(), + args: vec![Arg::Number(1), Arg::String("Lib.Fm".to_string())], + line: Some(1), + })], + }; + let err = FtaModel::from_procedure_file(&file).unwrap_err(); + assert!(matches!(err, FtaError::NonStringArg { index: 0, .. })); + } + + #[test] + fn lobster_document_sorted_by_tag() { + let doc = lobster_document( + model_from_stmts(vec![ + mk_call(TOP_EVENT, &["FM", "Lib.Zeta"], 1), + mk_call(OR_GATE, &["OG", "Lib.Zeta"], 2), + mk_call(BASIC_EVENT, &["cm", "Lib.Alpha", "OG"], 3), + ]) + .lobster_items("d.puml"), + ); + let data = doc["data"].as_array().unwrap(); + let tags: Vec<&str> = data.iter().map(|i| i["tag"].as_str().unwrap()).collect(); + let mut expected = tags.clone(); + expected.sort_unstable(); + assert_eq!(tags, expected); + assert_eq!(data[0]["name"], "Lib.Alpha"); + } + + #[test] + fn lobster_items_carry_source_line() { + let items = model_from_stmts(vec![mk_call(TOP_EVENT, &["FM", "Lib.Fm"], 7)]) + .lobster_items("d.puml"); + assert_eq!(items[0]["location"]["line"], 7); + } + + #[test] + fn duplicate_alias_last_write_wins_and_basic_event_is_attributed_correctly() { + // Two top events share the same alias; only the second should be + // reachable via `alias_index`, and the basic event must end up in the + // chain for the winning (second) entry. + let chains = model_from_stmts(vec![ + mk_call(TOP_EVENT, &["FM first", "Lib.Fm"], 1), + mk_call(TOP_EVENT, &["FM second", "Lib.Fm"], 2), + mk_call(OR_GATE, &["OG", "Lib.Fm"], 3), + mk_call(BASIC_EVENT, &["cm", "Lib.Cm", "OG"], 4), + ]) + .chains("d.puml"); + // Two chain entries (one per TopEvent node), but the basic event + // connects through OG → Lib.Fm which resolves to the last-write node. + assert_eq!(chains.len(), 2); + let with_cm: Vec<_> = chains + .iter() + .filter(|c| !c.control_measures.is_empty()) + .collect(); + assert_eq!(with_cm.len(), 1); + assert_eq!(with_cm[0].control_measures, vec!["Lib.Cm"]); + } +} diff --git a/plantuml/parser/puml_parser/src/lib.rs b/plantuml/parser/puml_parser/src/lib.rs index efe59557..7226e7b4 100644 --- a/plantuml/parser/puml_parser/src/lib.rs +++ b/plantuml/parser/puml_parser/src/lib.rs @@ -21,7 +21,7 @@ pub use parser_core::{ common_ast, common_parser, Arrow, BaseParseError, DiagramParser, ErrorLocation, }; pub use preprocessor::{ - IncludeExpandError, IncludeParseError, PreprocessError, Preprocessor, ProcedureExpandError, - ProcedureParseError, + IncludeExpandError, IncludeExpander, IncludeParseError, PreprocessError, Preprocessor, + ProcedureExpandError, ProcedureFile, ProcedureParseError, ProcedureParserService, }; pub use sequence_parser::{PumlSequenceParser, SeqPumlDocument, SequenceError}; diff --git a/plantuml/parser/puml_parser/src/preprocessor/src/lib.rs b/plantuml/parser/puml_parser/src/preprocessor/src/lib.rs index eab23e91..97812e63 100644 --- a/plantuml/parser/puml_parser/src/preprocessor/src/lib.rs +++ b/plantuml/parser/puml_parser/src/preprocessor/src/lib.rs @@ -15,4 +15,7 @@ mod preprocessor; pub use include_preprocessor::{IncludeExpandError, IncludeExpander, IncludeParseError}; pub use preprocessor::{PreprocessError, Preprocessor}; -pub use procedure_preprocessor::{ProcedureExpandError, ProcedureExpander, ProcedureParseError}; +pub use procedure_preprocessor::{ + ProcedureExpandError, ProcedureExpander, ProcedureFile, ProcedureParseError, + ProcedureParserService, +}; diff --git a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/lib.rs b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/lib.rs index cc0c5521..fd0194f7 100644 --- a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/lib.rs +++ b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/lib.rs @@ -15,5 +15,8 @@ mod procedure_ast; mod procedure_expander; mod procedure_parser; +pub use procedure_ast::{ + Arg, BodyNode, MacroCallDef, ProcedureDef, ProcedureFile, Statement, TextPart, +}; pub use procedure_expander::{ProcedureExpandError, ProcedureExpander}; -pub use procedure_parser::ProcedureParseError; +pub use procedure_parser::{ProcedureParseError, ProcedureParserService}; diff --git a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_ast.rs b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_ast.rs index e331c0bc..fc0e6d4f 100644 --- a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_ast.rs +++ b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_ast.rs @@ -28,6 +28,10 @@ pub enum Statement { pub struct MacroCallDef { pub name: String, pub args: Vec, + /// 1-based line number of the call in its source file. + /// `None` when the line is unavailable (e.g. synthesised nodes). + #[serde(default)] + pub line: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_parser.rs b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_parser.rs index 29645248..a6817f5c 100644 --- a/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_parser.rs +++ b/plantuml/parser/puml_parser/src/preprocessor/src/procedure/procedure_parser.rs @@ -112,6 +112,7 @@ fn parse_body(pair: pest::iterators::Pair) -> Vec { } fn parse_macro_call(pair: pest::iterators::Pair) -> MacroCallDef { + let (line, _col) = pair.line_col(); let mut name = String::new(); let mut args = Vec::new(); @@ -127,7 +128,11 @@ fn parse_macro_call(pair: pest::iterators::Pair) -> MacroCallDef { } } - MacroCallDef { name, args } + MacroCallDef { + name, + args, + line: Some(line), + } } fn parse_arg_list(pair: Pair) -> Vec {