From 8d66e2a2546f183e629b77f8ae24b3ac5dc09514 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Thu, 21 May 2026 12:07:13 +0200 Subject: [PATCH 1/7] initial guess copilot --- .../src/geos/mesh_doctor/actions/mapMFD.py | 448 ++++++++++++++++++ .../geos/mesh_doctor/parsing/mapMFDParsing.py | 30 ++ mesh-doctor/tests/test_MFD.py | 17 + 3 files changed, 495 insertions(+) create mode 100644 mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py create mode 100644 mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py create mode 100644 mesh-doctor/tests/test_MFD.py diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py new file mode 100644 index 000000000..a53efad1f --- /dev/null +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -0,0 +1,448 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Your Name +from dataclasses import dataclass +from enum import StrEnum +from typing import Any, Dict, List, Tuple +import numpy as np +from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk +import vtk + +from geos.mesh.io.vtkIO import writeMesh, VtkOutput, readUnstructuredGrid +from geos.mesh_doctor.parsing.cliParsing import setupLogger + + +class IPType(StrEnum): + TPFA = "TPFA" + QTPFA = "QTPFA" + BdLVM = "BdLVM" + + +@dataclass(frozen=True) +class Face: + indexes: List[int] + center: List[float] + normal: List[float] + area: float + + +VTK_TETRA_FACES = [ + [0, 1, 3], + [1, 2, 3], + [0, 3, 2], + [0, 2, 1], +] + +VTK_VOXEL_FACES = [ + [0, 2, 6, 4], + [1, 3, 7, 5], + [0, 1, 5, 4], + [2, 3, 7, 6], + [0, 1, 3, 2], + [4, 5, 7, 6], +] + +VTK_HEXAHEDRON_FACES = [ + [0, 3, 2, 1], + [4, 5, 6, 7], + [0, 1, 5, 4], + [2, 3, 7, 6], + [0, 4, 7, 3], + [1, 2, 6, 5], +] + +VTK_WEDGE_FACES = [ + [0, 1, 2], + [3, 5, 4], + [0, 3, 4, 1], + [1, 4, 5, 2], + [0, 2, 5, 3], +] + +VTK_PYRAMID_FACES = [ + [0, 3, 2, 1], + [0, 1, 4], + [1, 2, 4], + [2, 3, 4], + [3, 0, 4], +] + +_FACE_TABLE: Dict[int, List[List[int]]] = { + vtk.VTK_TETRA: VTK_TETRA_FACES, + vtk.VTK_VOXEL: VTK_VOXEL_FACES, + vtk.VTK_HEXAHEDRON: VTK_HEXAHEDRON_FACES, + vtk.VTK_WEDGE: VTK_WEDGE_FACES, + vtk.VTK_PYRAMID: VTK_PYRAMID_FACES, +} + + +def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: + cell_type = cell.GetCellType() + if cell_type not in _FACE_TABLE: + unknown_name = vtk.vtkCellTypes.GetClassNameFromTypeId(cell_type) or str(cell_type) + raise ValueError(f"Unsupported cell type '{unknown_name}' (id={cell_type}).") + local_to_global = [cell.GetPointId(i) for i in range(cell.GetNumberOfPoints())] + return [[local_to_global[local_idx] for local_idx in face] for face in _FACE_TABLE[cell_type]] + + +def _filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: + volumeIds = vtk.vtkIdTypeArray() + for i in range(mesh.GetNumberOfCells()): + dim = mesh.GetCell(i).GetCellDimension() + if dim == 3: + volumeIds.InsertNextValue(i) + + sn = vtk.vtkSelectionNode() + sn.SetFieldType(vtk.vtkSelectionNode.CELL) + sn.SetContentType(vtk.vtkSelectionNode.INDICES) + sn.SetSelectionList(volumeIds) + + sel = vtk.vtkSelection() + sel.AddNode(sn) + + ext = vtk.vtkExtractSelection() + ext.SetInputData(0, mesh) + ext.SetInputData(1, sel) + ext.Update() + return ext.GetOutput() + + +def attach_matrix_as_multicomponent(mesh: vtk.vtkDataSet, matrices: List[np.ndarray], field_name: str = "MatrixField", on_points: bool = False) -> vtk.vtkDataSet: + NF = matrices[0].shape[0] + flat = np.array([m.flatten() for m in matrices], dtype=np.float64) + vtk_arr = numpy_to_vtk(flat, deep=True, array_type=vtk.VTK_DOUBLE) + vtk_arr.SetName(field_name) + vtk_arr.SetNumberOfComponents(NF * NF) + for i in range(1, NF + 1): + for j in range(1, NF + 1): + comp_idx = (i - 1) * NF + (j - 1) + vtk_arr.SetComponentName(comp_idx, f"{i}/{j}") + data_store = mesh.GetPointData() if on_points else mesh.GetCellData() + data_store.AddArray(vtk_arr) + return mesh + + +def attach_results(mesh: vtk.vtkDataSet, matrices: List[Tuple[float, float, float, float]], field_name: str = "MatrixField") -> vtk.vtkDataSet: + flat = np.array([np.asarray(m, dtype=np.float64) for m in matrices]) + vtk_arr = numpy_to_vtk(flat, deep=True, array_type=vtk.VTK_DOUBLE) + vtk_arr.SetName(field_name) + vtk_arr.SetNumberOfComponents(4) + vtk_arr.SetComponentName(0, "condM") + vtk_arr.SetComponentName(1, "condMr") + vtk_arr.SetComponentName(2, "lambda_m") + vtk_arr.SetComponentName(3, "lambda_M") + mesh.GetCellData().AddArray(vtk_arr) + return mesh + + +class ComputeMFD: + def __init__(self, mesh): + self.faces, self.face2cell = ComputeMFD.compute_newell(mesh) + self.cell_centers = ComputeMFD.compute_cell_centroids(mesh) + + def set_IP(self, ip_type: IPType): + self.ip_type = ip_type + + def compute(self, mesh): + if self.ip_type == IPType.TPFA: + return self.compute_tpfa(mesh) + elif self.ip_type == IPType.QTPFA: + return self.compute_quasitpfa(mesh) + elif self.ip_type == IPType.BdLVM: + return self.compute_bdlvm(mesh) + else: + raise ValueError(f"Unsupported IP type: {self.ip_type}") + + def compute_tpfa(self, mesh) -> list[np.ndarray]: + perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) + cell2face = {} + [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] + ncells = len(self.cell_centers) + M = [None] * ncells + face_centers = np.array([self.faces[k].center for k in self.face2cell]) + face_normals = np.array([self.faces[k].normal for k in self.face2cell]) + face_area = np.array([self.faces[k].area for k in self.face2cell]) + + def process_cell(cell): + face_indices = cell2face.get(cell) + nfacesPerCell = len(face_indices) + Mloc = np.zeros((nfacesPerCell, nfacesPerCell)) + face_cell_vec = np.ndarray((ncells, nfacesPerCell, 3)) + face_cell_dist = np.ndarray((ncells, nfacesPerCell)) + face_cell_vec[cell, :, :] = face_centers[face_indices, :] - self.cell_centers[cell, :] + face_cell_dist[cell, :] = np.linalg.norm(face_cell_vec[cell, :, :], axis=1) + face_cell_vec[cell, :, :] /= face_cell_dist[cell, :].reshape(nfacesPerCell, 1) + + face_normals[face_indices, :] = ComputeMFD.reorient_normal(face_normals[face_indices, :], face_cell_vec[cell, :, :]) + T = np.einsum('ni,ni,ni->n', face_cell_vec[cell, :, :], np.tile(perm[cell, :], (nfacesPerCell, 1)), face_normals[face_indices, :]) + T *= face_area[face_indices] / face_cell_dist[cell, :] + Mloc[range(nfacesPerCell), range(nfacesPerCell)] = 1 / T + return cell, Mloc + + from concurrent.futures import ThreadPoolExecutor, as_completed + total = len(cell2face) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(process_cell, i) for i in range(total)] + results = [future.result() for future in as_completed(futures)] + + for cell, Mloc in results: + M[cell] = Mloc + return M + + def compute_quasitpfa(self, mesh): + perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) + invperm = 1.0 / perm + vol = vtk_to_numpy(mesh.GetCellData().GetArray("Volume")) + faces, self.face2cell = ComputeMFD.compute_newell(mesh) + cell2face = {} + [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] + cell_centers = ComputeMFD.compute_cell_centroids(mesh) + ncells = len(cell_centers) + M = [None] * ncells + face_centers = np.array([faces[k].center for k in self.face2cell]) + face_normals = np.array([faces[k].normal for k in self.face2cell]) + face_area = np.array([faces[k].area for k in self.face2cell]) + + def process_cell(cell): + face_indices = cell2face.get(cell) + nf = len(face_indices) + fc_vec = face_centers[face_indices] - cell_centers[cell] + c = fc_vec + loc_normals = ComputeMFD.reorient_normal(face_normals[face_indices], c) + n = loc_normals * face_area[face_indices, None] + K = np.tile(perm[cell], (nf, 1)) + Kinv = np.tile(invperm[cell], (nf, 1)) + Mloc = np.einsum('ni,ni,mi->nm', c, Kinv, c) / vol[cell] + w = np.einsum('ni,ni,ni->n', n, K, n) + Q, _ = np.linalg.qr(n, mode='reduced') + proj = np.eye(nf) - np.einsum('ni,mi->nm', Q, Q) + S = np.einsum('ij,j,jk->ik', proj, w, proj) + S = (vol[cell] / 2.0) * S + indicators = ComputeMFD._get_indicators(proj, Mloc, S) + return (cell, *indicators) + + from concurrent.futures import ThreadPoolExecutor, as_completed + total = len(cell2face) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(process_cell, i) for i in range(total)] + results = [future.result() for future in as_completed(futures)] + + for cell, c, cr, lm, lM in results: + M[cell] = (c, cr, lm, lM) + return M + + def compute_bdlvm(self, mesh): + perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) + cell2face = {} + [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] + ncells = len(self.cell_centers) + M = [None] * ncells + face_centers = np.array([self.faces[k].center for k in self.face2cell]) + face_normals = np.array([self.faces[k].normal for k in self.face2cell]) + face_area = np.array([self.faces[k].area for k in self.face2cell]) + + def process_cell(cell): + face_indices = cell2face.get(cell) + nf = len(face_indices) + gamma = 1.0 / nf + fc_vec = face_centers[face_indices] - self.cell_centers[cell] + area = face_area[face_indices] + c = np.einsum('ni,n->ni', fc_vec, area) + n = face_normals[face_indices] + K = np.tile(perm[cell], (nf, 1)) + n = np.einsum('ni,ni->ni', n, K) + CtN_inv = np.linalg.pinv(c.T @ n) + M0 = np.einsum('ni,ij,mj->nm', c, CtN_inv, c) + NtN_inv = np.linalg.pinv(n.T @ n) + proj = np.eye(nf) - np.einsum('ni,ij,mj->nm', n, NtN_inv, n) + Mloc = M0 + S = gamma * proj + Mloc = np.einsum('ij,i,j->ij', Mloc, 1.0 / area, 1.0 / area) + S = np.einsum('ij,i,j->ij', S, 1.0 / area, 1.0 / area) + indicators = ComputeMFD._get_indicators(proj, Mloc, S) + return (cell, *indicators) + + from concurrent.futures import ThreadPoolExecutor, as_completed + total = len(cell2face) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(process_cell, i) for i in range(total)] + results = [future.result() for future in as_completed(futures)] + + for cell, c, cr, lm, lM in results: + M[cell] = (c, cr, lm, lM) + return M + + @staticmethod + def _get_indicators(K, M, S, compute_eigs=True): + Sr = K.T @ S @ K + Mr = K.T @ M @ K + if compute_eigs: + try: + lambdas = np.linalg.eigvals(np.linalg.solve(Mr, Sr)) + except Exception: + lambdas = np.linalg.eigvals(np.linalg.pinv(Mr) @ Sr) + else: + lambdas = [] + return np.linalg.cond(M + S), np.linalg.cond(Mr + Sr), np.min(lambdas.real), np.max(lambdas.real) + + @staticmethod + def centroid_3d_polygon(mesh, point_indices: List[int], area_tolerance: float = 0.0): + n = len(point_indices) + if n < 2: + raise ValueError(f"Polygon must have at least 2 points, got {n}.") + points = vtk_to_numpy(mesh.GetPoints().GetData()) + origin = points[point_indices[0]].copy() + normal = np.zeros(3) + center = np.zeros(3) + for a in range(n): + current = points[point_indices[a]] - origin + next_pt = points[point_indices[(a + 1) % n]] - origin + cross = np.cross(current, next_pt) + normal += cross + center += next_pt + area = np.linalg.norm(normal) + center = center / n + origin + if area > area_tolerance: + normal /= area + area *= 0.5 + elif area < -area_tolerance: + raise ValueError("Negative area found") + else: + return center, normal, 0.0 + return center, normal, area + + @staticmethod + def compute_newell(mesh): + faces = {} + face2cell = {} + + def process_cell(cellid): + local_faces = [] + local_map = [] + list_indexes = get_cell_faces(mesh.GetCell(cellid)) + for indexes in list_indexes: + center, normal, area = ComputeMFD.centroid_3d_polygon(mesh, indexes) + key = tuple(sorted(indexes)) + local_faces.append((key, indexes, center, normal, area)) + local_map.append((key, cellid)) + return local_faces, local_map + + next_index = 0 + from concurrent.futures import ThreadPoolExecutor, as_completed + total = mesh.GetNumberOfCells() + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(process_cell, i) for i in range(total)] + results = [future.result() for future in as_completed(futures)] + + face_lookup = {} + for local_faces, local_map in results: + for key, indexes, center, normal, area in local_faces: + if key not in face_lookup: + face_lookup[key] = next_index + faces[next_index] = Face(indexes=indexes, center=center, normal=normal, area=area) + next_index += 1 + for key, cellid in local_map: + face_index = face_lookup[key] + face2cell.setdefault(face_index, []).append(cellid) + + return faces, face2cell + + @staticmethod + def reorient_normal(normals, cell2vec): + flag = np.einsum('ni,ni->n', normals, cell2vec) < 0 + normals[flag] = -normals[flag] + return normals + + @staticmethod + def compute_cell_centroids(mesh) -> np.ndarray: + vtkCellCenters = vtk.vtkCellCenters() + vtkCellCenters.SetInputData(mesh) + vtkCellCenters.Update() + return vtk_to_numpy(vtkCellCenters.GetOutput().GetPoints().GetData()) + + +def create_hex_grid(nx=2, ny=2, nz=2) -> vtk.vtkUnstructuredGrid: + mesh = vtk.vtkUnstructuredGrid() + points = vtk.vtkPoints() + for k in range(nz + 1): + for j in range(ny + 1): + for i in range(nx + 1): + points.InsertNextPoint(i / nx, j / ny, k / nz) + mesh.SetPoints(points) + + def pid(i, j, k): + return k * (ny + 1) * (nx + 1) + j * (nx + 1) + i + + for k in range(nz): + for j in range(ny): + for i in range(nx): + ids = vtk.vtkIdList() + for node in [ + pid(i, j, k), + pid(i + 1, j, k), + pid(i + 1, j + 1, k), + pid(i, j + 1, k), + pid(i, j, k + 1), + pid(i + 1, j, k + 1), + pid(i + 1, j + 1, k + 1), + pid(i, j + 1, k + 1), + ]: + ids.InsertNextId(node) + mesh.InsertNextCell(vtk.VTK_HEXAHEDRON, ids) + return add_permeability(mesh) + + +def add_permeability(mesh): + perm = np.ones((mesh.GetNumberOfCells(), 3)) + vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_UNSIGNED_INT) + vtk_perm.SetName("Permeability") + mesh.GetCellData().AddArray(vtk_perm) + return mesh + + +def add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: + quality = vtk.vtkMeshQuality() + quality.SetInputData(mesh) + quality.SetHexQualityMeasureToVolume() + quality.SetTetQualityMeasureToVolume() + quality.SetWedgeQualityMeasureToVolume() + quality.SetPyramidQualityMeasureToVolume() + quality.Update() + quality_array = vtk_to_numpy(quality.GetOutput().GetCellData().GetArray("Quality")) + volume_array = numpy_to_vtk(quality_array, deep=True) + volume_array.SetName("Volume") + mesh.GetCellData().AddArray(volume_array) + return mesh + + +@dataclass(frozen=True) +class Options: + vtkOutput: VtkOutput + ip: str + + +@dataclass(frozen=True) +class Result: + info: str + + +def meshAction(mesh, options: Options) -> Result: + mfd = ComputeMFD(mesh) + try: + mfd.set_IP(IPType[options.ip]) + except Exception: + raise ValueError(f"Unsupported IP type: {options.ip}") + + res = mfd.compute(mesh) + try: + mesh = attach_results(mesh, res, f"{options.ip}_Results") + except Exception: + mesh = attach_matrix_as_multicomponent(mesh, res, f"{options.ip}_Results") + + writeMesh(mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger) + return Result(info=f"MFD {options.ip} computed and written to {options.vtkOutput.output}") + + +def action(vtuInputFile: str, options: Options) -> Result: + mesh = readUnstructuredGrid(vtuInputFile) + return meshAction(mesh, options) diff --git a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py new file mode 100644 index 000000000..521df6aec --- /dev/null +++ b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py @@ -0,0 +1,30 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Your Name +from __future__ import annotations +from argparse import _SubParsersAction +from typing import Any + +from geos.mesh_doctor.parsing.cliParsing import setupLogger, addVtuInputFileArgument +from geos.mesh_doctor.parsing import vtkOutputParsing +from geos.mesh_doctor.actions.mapMFD import Options, Result + + +__IP = "ip" + + +def convert( parsedOptions: dict[ str, Any ] ) -> Options: + vtkOut = vtkOutputParsing.convert( parsedOptions ) + ip = parsedOptions[ __IP ] + return Options( vtkOutput=vtkOut, ip=ip ) + + +def fillSubparser( subparsers: _SubParsersAction[ Any ] ) -> None: + p = subparsers.add_parser( "map-mfd", help="Compute MFD indicators and attach results to a VTU" ) + addVtuInputFileArgument( p, required=False ) + p.add_argument( "--" + __IP, type=str, choices=("TPFA", "QTPFA", "BdLVM"), required=True ) + vtkOutputParsing.fillVtkOutputSubparser( p ) + + +def displayResults( options: Options, result: Result ) -> None: + setupLogger.info( result.info ) diff --git a/mesh-doctor/tests/test_MFD.py b/mesh-doctor/tests/test_MFD.py new file mode 100644 index 000000000..0a9c927f4 --- /dev/null +++ b/mesh-doctor/tests/test_MFD.py @@ -0,0 +1,17 @@ +from geos.mesh_doctor.actions.mapMFD import create_hex_grid, add_cell_volumes, meshAction, Options +from geos.mesh.io.vtkIO import VtkOutput + + +def test_mfd_indicators_hex_grid(tmp_path): + mesh = create_hex_grid(1, 1, 1) + mesh = add_cell_volumes(mesh) + + for ip in ["QTPFA", "BdLVM"]: + out = tmp_path / f"mfd_{ip}.vtu" + options = Options(vtkOutput=VtkOutput(output=str(out), isDataModeBinary=True), ip=ip) + meshAction(mesh, options) + + arr = mesh.GetCellData().GetArray(f"{ip}_Results") + assert arr is not None + assert arr.GetNumberOfTuples() == mesh.GetNumberOfCells() + assert out.exists() From 23877c0371c43f507ccd8f3b8bd2e224d4d9e816 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Tue, 26 May 2026 10:22:59 +0200 Subject: [PATCH 2/7] comment out TPFA (not interfaced) + docs --- .../src/geos/mesh_doctor/actions/mapMFD.py | 271 +++++++++--------- .../geos/mesh_doctor/parsing/mapMFDParsing.py | 27 +- mesh-doctor/tests/test_MFD.py | 52 +++- 3 files changed, 211 insertions(+), 139 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index a53efad1f..de32c6fb9 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. -# SPDX-FileContributor: Your Name +# SPDX-FileContributor: Jacques Franc, Copilot from dataclasses import dataclass from enum import StrEnum from typing import Any, Dict, List, Tuple @@ -11,15 +11,14 @@ from geos.mesh.io.vtkIO import writeMesh, VtkOutput, readUnstructuredGrid from geos.mesh_doctor.parsing.cliParsing import setupLogger - class IPType(StrEnum): - TPFA = "TPFA" + """Interface Pressure" type for MFD indicators.""" + # TPFA = "TPFA" QTPFA = "QTPFA" BdLVM = "BdLVM" - - @dataclass(frozen=True) class Face: + """Represents a face of a cell in the mesh, defined by its vertex indexes, center, normal vector, and area.""" indexes: List[int] center: List[float] normal: List[float] @@ -77,6 +76,7 @@ class Face: def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: + """Returns the list of faces for a given cell, where each face is represented by a list of vertex indexes.""" cell_type = cell.GetCellType() if cell_type not in _FACE_TABLE: unknown_name = vtk.vtkCellTypes.GetClassNameFromTypeId(cell_type) or str(cell_type) @@ -85,7 +85,9 @@ def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: return [[local_to_global[local_idx] for local_idx in face] for face in _FACE_TABLE[cell_type]] -def _filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: +# TODO: Refactor the ComputeMFD class to separate the computation of indicators from the mesh processing logic, and to allow for more flexible input parameters (e.g., different permeability fields, additional indicators, etc.). +def __filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: + """Filters the input mesh to retain only volume cells (3D cells) and returns the resulting mesh.""" volumeIds = vtk.vtkIdTypeArray() for i in range(mesh.GetNumberOfCells()): dim = mesh.GetCell(i).GetCellDimension() @@ -106,97 +108,107 @@ def _filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: ext.Update() return ext.GetOutput() - -def attach_matrix_as_multicomponent(mesh: vtk.vtkDataSet, matrices: List[np.ndarray], field_name: str = "MatrixField", on_points: bool = False) -> vtk.vtkDataSet: - NF = matrices[0].shape[0] - flat = np.array([m.flatten() for m in matrices], dtype=np.float64) - vtk_arr = numpy_to_vtk(flat, deep=True, array_type=vtk.VTK_DOUBLE) - vtk_arr.SetName(field_name) - vtk_arr.SetNumberOfComponents(NF * NF) - for i in range(1, NF + 1): - for j in range(1, NF + 1): - comp_idx = (i - 1) * NF + (j - 1) - vtk_arr.SetComponentName(comp_idx, f"{i}/{j}") - data_store = mesh.GetPointData() if on_points else mesh.GetCellData() - data_store.AddArray(vtk_arr) - return mesh - - -def attach_results(mesh: vtk.vtkDataSet, matrices: List[Tuple[float, float, float, float]], field_name: str = "MatrixField") -> vtk.vtkDataSet: +def __attach_results(mesh: vtk.vtkDataSet, matrices: List[Tuple[float, float, float, float, float, float]], field_name: str = "MatrixField") -> vtk.vtkDataSet: + """Attaches the computed MFD indicators to the mesh as a new cell data array with the specified field name.""" flat = np.array([np.asarray(m, dtype=np.float64) for m in matrices]) vtk_arr = numpy_to_vtk(flat, deep=True, array_type=vtk.VTK_DOUBLE) vtk_arr.SetName(field_name) - vtk_arr.SetNumberOfComponents(4) + vtk_arr.SetNumberOfComponents(6) vtk_arr.SetComponentName(0, "condM") - vtk_arr.SetComponentName(1, "condMr") + vtk_arr.SetComponentName(1, "consistency") vtk_arr.SetComponentName(2, "lambda_m") vtk_arr.SetComponentName(3, "lambda_M") + vtk_arr.SetComponentName(4, "idempotence") + vtk_arr.SetComponentName(5, "orthogonality") mesh.GetCellData().AddArray(vtk_arr) return mesh - class ComputeMFD: - def __init__(self, mesh): + """Class responsible for computing MFD indicators for a given mesh and permeability field, based on the specified interface pressure type (TPFA, QTPFA, or BdLVM).""" + def __init__(self, mesh, permeability_field: str = "Permeability"): + """Initializes the ComputeMFD instance by computing the faces, cell centers, and permeability field from the input mesh.""" self.faces, self.face2cell = ComputeMFD.compute_newell(mesh) - self.cell_centers = ComputeMFD.compute_cell_centroids(mesh) + self.cell_centers = ComputeMFD.__compute_cell_centroids(mesh) + # make sure that we have always Volume and don't act on surfaces or lines + mesh = __filterVolumeCells(mesh) + mesh = __add_cell_volumes(mesh) + self.permeability_field = permeability_field def set_IP(self, ip_type: IPType): + """Sets the interface pressure type for the MFD computation. + + Args: + ip_type (IPType): The interface pressure type to use for the MFD computation (TPFA, QTPFA, or BdLVM). + """ self.ip_type = ip_type def compute(self, mesh): - if self.ip_type == IPType.TPFA: - return self.compute_tpfa(mesh) - elif self.ip_type == IPType.QTPFA: + """Computes the MFD indicators for the input mesh based on the specified interface pressure type and returns the results as a list of tuples containing the indicators for each cell. + + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators. + """ + # if self.ip_type == IPType.TPFA: + # return self.compute_tpfa(mesh) + if self.ip_type == IPType.QTPFA: return self.compute_quasitpfa(mesh) elif self.ip_type == IPType.BdLVM: return self.compute_bdlvm(mesh) else: raise ValueError(f"Unsupported IP type: {self.ip_type}") - def compute_tpfa(self, mesh) -> list[np.ndarray]: - perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) - cell2face = {} - [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] - ncells = len(self.cell_centers) - M = [None] * ncells - face_centers = np.array([self.faces[k].center for k in self.face2cell]) - face_normals = np.array([self.faces[k].normal for k in self.face2cell]) - face_area = np.array([self.faces[k].area for k in self.face2cell]) - - def process_cell(cell): - face_indices = cell2face.get(cell) - nfacesPerCell = len(face_indices) - Mloc = np.zeros((nfacesPerCell, nfacesPerCell)) - face_cell_vec = np.ndarray((ncells, nfacesPerCell, 3)) - face_cell_dist = np.ndarray((ncells, nfacesPerCell)) - face_cell_vec[cell, :, :] = face_centers[face_indices, :] - self.cell_centers[cell, :] - face_cell_dist[cell, :] = np.linalg.norm(face_cell_vec[cell, :, :], axis=1) - face_cell_vec[cell, :, :] /= face_cell_dist[cell, :].reshape(nfacesPerCell, 1) - - face_normals[face_indices, :] = ComputeMFD.reorient_normal(face_normals[face_indices, :], face_cell_vec[cell, :, :]) - T = np.einsum('ni,ni,ni->n', face_cell_vec[cell, :, :], np.tile(perm[cell, :], (nfacesPerCell, 1)), face_normals[face_indices, :]) - T *= face_area[face_indices] / face_cell_dist[cell, :] - Mloc[range(nfacesPerCell), range(nfacesPerCell)] = 1 / T - return cell, Mloc - - from concurrent.futures import ThreadPoolExecutor, as_completed - total = len(cell2face) - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_cell, i) for i in range(total)] - results = [future.result() for future in as_completed(futures)] - - for cell, Mloc in results: - M[cell] = Mloc - return M + # def compute_tpfa(self, mesh) -> list[np.ndarray]: + # """Computes the MFD indicators using the TPFA method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. + # Args: + # mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the TPFA method. + # """ + # perm = vtk_to_numpy(mesh.GetCellData().GetArray(self.permeability_field)) + # cell2face = {} + # [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] + # ncells = len(self.cell_centers) + # M = [None] * ncells + # face_centers = np.array([self.faces[k].center for k in self.face2cell]) + # face_normals = np.array([self.faces[k].normal for k in self.face2cell]) + # face_area = np.array([self.faces[k].area for k in self.face2cell]) + + # def process_cell(cell): + # face_indices = cell2face.get(cell) + # nfacesPerCell = len(face_indices) + # Mloc = np.zeros((nfacesPerCell, nfacesPerCell)) + # face_cell_vec = np.ndarray((ncells, nfacesPerCell, 3)) + # face_cell_dist = np.ndarray((ncells, nfacesPerCell)) + # face_cell_vec[cell, :, :] = face_centers[face_indices, :] - self.cell_centers[cell, :] + # face_cell_dist[cell, :] = np.linalg.norm(face_cell_vec[cell, :, :], axis=1) + # face_cell_vec[cell, :, :] /= face_cell_dist[cell, :].reshape(nfacesPerCell, 1) + + # face_normals[face_indices, :] = ComputeMFD.__reorient_normal(face_normals[face_indices, :], face_cell_vec[cell, :, :]) + # T = np.einsum('ni,ni,ni->n', face_cell_vec[cell, :, :], np.tile(perm[cell, :], (nfacesPerCell, 1)), face_normals[face_indices, :]) + # T *= face_area[face_indices] / face_cell_dist[cell, :] + # Mloc[range(nfacesPerCell), range(nfacesPerCell)] = 1 / T + # return cell, Mloc + + # from concurrent.futures import ThreadPoolExecutor, as_completed + # total = len(cell2face) + # with ThreadPoolExecutor(max_workers=8) as executor: + # futures = [executor.submit(process_cell, i) for i in range(total)] + # results = [future.result() for future in as_completed(futures)] + + # for cell, Mloc in results: + # M[cell] = Mloc + # return M def compute_quasitpfa(self, mesh): - perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) + """Computes the MFD indicators using the QTPFA method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the QTPFA method. + """ + perm = vtk_to_numpy(mesh.GetCellData().GetArray(self.permeability_field)) invperm = 1.0 / perm vol = vtk_to_numpy(mesh.GetCellData().GetArray("Volume")) faces, self.face2cell = ComputeMFD.compute_newell(mesh) cell2face = {} [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] - cell_centers = ComputeMFD.compute_cell_centroids(mesh) + cell_centers = ComputeMFD.__compute_cell_centroids(mesh) ncells = len(cell_centers) M = [None] * ncells face_centers = np.array([faces[k].center for k in self.face2cell]) @@ -208,7 +220,7 @@ def process_cell(cell): nf = len(face_indices) fc_vec = face_centers[face_indices] - cell_centers[cell] c = fc_vec - loc_normals = ComputeMFD.reorient_normal(face_normals[face_indices], c) + loc_normals = ComputeMFD.__reorient_normal(face_normals[face_indices], c) n = loc_normals * face_area[face_indices, None] K = np.tile(perm[cell], (nf, 1)) Kinv = np.tile(invperm[cell], (nf, 1)) @@ -218,7 +230,7 @@ def process_cell(cell): proj = np.eye(nf) - np.einsum('ni,mi->nm', Q, Q) S = np.einsum('ij,j,jk->ik', proj, w, proj) S = (vol[cell] / 2.0) * S - indicators = ComputeMFD._get_indicators(proj, Mloc, S) + indicators = ComputeMFD.__get_indicators(proj, Mloc, S) return (cell, *indicators) from concurrent.futures import ThreadPoolExecutor, as_completed @@ -227,12 +239,16 @@ def process_cell(cell): futures = [executor.submit(process_cell, i) for i in range(total)] results = [future.result() for future in as_completed(futures)] - for cell, c, cr, lm, lM in results: - M[cell] = (c, cr, lm, lM) + for cell, c, cr, lm, lM, idem, ortho in results: + M[cell] = (c, cr, lm, lM, idem, ortho) return M def compute_bdlvm(self, mesh): - perm = vtk_to_numpy(mesh.GetCellData().GetArray("Permeability")) + """Computes the MFD indicators using the BDLVM method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the BDLVM method. + """ + perm = vtk_to_numpy(mesh.GetCellData().GetArray(self.permeability_field)) cell2face = {} [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] ncells = len(self.cell_centers) @@ -259,7 +275,7 @@ def process_cell(cell): S = gamma * proj Mloc = np.einsum('ij,i,j->ij', Mloc, 1.0 / area, 1.0 / area) S = np.einsum('ij,i,j->ij', S, 1.0 / area, 1.0 / area) - indicators = ComputeMFD._get_indicators(proj, Mloc, S) + indicators = ComputeMFD.__get_indicators(proj, Mloc, S) return (cell, *indicators) from concurrent.futures import ThreadPoolExecutor, as_completed @@ -268,25 +284,38 @@ def process_cell(cell): futures = [executor.submit(process_cell, i) for i in range(total)] results = [future.result() for future in as_completed(futures)] - for cell, c, cr, lm, lM in results: - M[cell] = (c, cr, lm, lM) + for cell, c, cr, lm, lM, idem, ortho in results: + M[cell] = (c, cr, lm, lM, idem, ortho) return M @staticmethod - def _get_indicators(K, M, S, compute_eigs=True): + def __get_indicators(K, M, S, compute_eigs=True): + """Computes the MFD indicators (condition number, consistency, eigenvalues, idempotence, and orthogonality) based on the local MFD matrices K, M, and S for a given cell.""" Sr = K.T @ S @ K Mr = K.T @ M @ K - if compute_eigs: - try: - lambdas = np.linalg.eigvals(np.linalg.solve(Mr, Sr)) - except Exception: - lambdas = np.linalg.eigvals(np.linalg.pinv(Mr) @ Sr) - else: + + def get_eigs(Mx, Sx): lambdas = [] - return np.linalg.cond(M + S), np.linalg.cond(Mr + Sr), np.min(lambdas.real), np.max(lambdas.real) + if compute_eigs: + try: + lambdas = np.linalg.eigvals(Mx + Sx) + except Exception: + lambdas = np.linalg.eigvals(np.linalg.pinv(Mx) @ Sx) + return lambdas + + lambdas = get_eigs(M, S) + return ( + np.linalg.cond(M + S), + np.linalg.matrix_norm(Sr), + np.min(lambdas.real), + np.max(lambdas.real), + np.linalg.matrix_norm(K - K @ K), + np.linalg.matrix_norm(K @ S), + ) @staticmethod def centroid_3d_polygon(mesh, point_indices: List[int], area_tolerance: float = 0.0): + """Computes the centroid, normal vector, and area of a 3D polygon defined by the given point indices in the input mesh, using Newell's method.""" n = len(point_indices) if n < 2: raise ValueError(f"Polygon must have at least 2 points, got {n}.") @@ -313,6 +342,9 @@ def centroid_3d_polygon(mesh, point_indices: List[int], area_tolerance: float = @staticmethod def compute_newell(mesh): + """Computes the faces of the mesh using Newell's method and returns a dictionary of faces and a mapping from face indices to cell indices. + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the faces using Newell's method.""" faces = {} face2cell = {} @@ -348,59 +380,22 @@ def process_cell(cellid): return faces, face2cell @staticmethod - def reorient_normal(normals, cell2vec): + def __reorient_normal(normals, cell2vec): + """Reorients the normal vectors of the faces based on the direction of the vector from the cell center to the face center, ensuring that the normals point outward from the cell.""" flag = np.einsum('ni,ni->n', normals, cell2vec) < 0 normals[flag] = -normals[flag] return normals @staticmethod - def compute_cell_centroids(mesh) -> np.ndarray: + def __compute_cell_centroids(mesh) -> np.ndarray: + """Computes the centroids of the cells in the input mesh and returns them as a NumPy array.""" vtkCellCenters = vtk.vtkCellCenters() vtkCellCenters.SetInputData(mesh) vtkCellCenters.Update() return vtk_to_numpy(vtkCellCenters.GetOutput().GetPoints().GetData()) - -def create_hex_grid(nx=2, ny=2, nz=2) -> vtk.vtkUnstructuredGrid: - mesh = vtk.vtkUnstructuredGrid() - points = vtk.vtkPoints() - for k in range(nz + 1): - for j in range(ny + 1): - for i in range(nx + 1): - points.InsertNextPoint(i / nx, j / ny, k / nz) - mesh.SetPoints(points) - - def pid(i, j, k): - return k * (ny + 1) * (nx + 1) + j * (nx + 1) + i - - for k in range(nz): - for j in range(ny): - for i in range(nx): - ids = vtk.vtkIdList() - for node in [ - pid(i, j, k), - pid(i + 1, j, k), - pid(i + 1, j + 1, k), - pid(i, j + 1, k), - pid(i, j, k + 1), - pid(i + 1, j, k + 1), - pid(i + 1, j + 1, k + 1), - pid(i, j + 1, k + 1), - ]: - ids.InsertNextId(node) - mesh.InsertNextCell(vtk.VTK_HEXAHEDRON, ids) - return add_permeability(mesh) - - -def add_permeability(mesh): - perm = np.ones((mesh.GetNumberOfCells(), 3)) - vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_UNSIGNED_INT) - vtk_perm.SetName("Permeability") - mesh.GetCellData().AddArray(vtk_perm) - return mesh - - -def add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: +def __add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: + """Computes the volumes of the cells in the input mesh and adds them as a new cell data array named 'Volume', returning the modified mesh.""" quality = vtk.vtkMeshQuality() quality.SetInputData(mesh) quality.SetHexQualityMeasureToVolume() @@ -417,32 +412,44 @@ def add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: @dataclass(frozen=True) class Options: + """Configuration options for MFD computation, including the output settings, interface pressure type, and permeability field name.""" vtkOutput: VtkOutput ip: str + permeability: str = "Permeability" @dataclass(frozen=True) class Result: + """Result of the MFD computation.""" info: str def meshAction(mesh, options: Options) -> Result: - mfd = ComputeMFD(mesh) + """Performs the MFD computation on the input mesh using the specified options and returns the results as a Result object containing information about the computation. + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators. + options (Options): The configuration options for the MFD computation, including the output settings, interface pressure type, and permeability field name. + Returns: + Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation.""" + mfd = ComputeMFD(mesh, permeability_field=options.permeability) try: mfd.set_IP(IPType[options.ip]) except Exception: raise ValueError(f"Unsupported IP type: {options.ip}") res = mfd.compute(mesh) - try: - mesh = attach_results(mesh, res, f"{options.ip}_Results") - except Exception: - mesh = attach_matrix_as_multicomponent(mesh, res, f"{options.ip}_Results") + mesh = __attach_results(mesh, res, f"{options.ip}_Results") writeMesh(mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger) return Result(info=f"MFD {options.ip} computed and written to {options.vtkOutput.output}") def action(vtuInputFile: str, options: Options) -> Result: + """Main entry point for the MFD computation action, which reads the input mesh from the specified VTU file, performs the MFD computation using the provided options, and returns the results as a Result object containing information about the computation. + Args: + vtuInputFile (str): The file path to the input VTU mesh for which to compute the MFD indicators. + options (Options): The configuration options for the MFD computation, including the output settings, interface pressure type, and permeability field name. + Returns: + Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation.""" mesh = readUnstructuredGrid(vtuInputFile) return meshAction(mesh, options) diff --git a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py index 521df6aec..3f52833b5 100644 --- a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py +++ b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. -# SPDX-FileContributor: Your Name +# SPDX-FileContributor: Jacques Franc from __future__ import annotations from argparse import _SubParsersAction from typing import Any @@ -11,20 +11,41 @@ __IP = "ip" +__PERMEABILITY = "permeability" def convert( parsedOptions: dict[ str, Any ] ) -> Options: + """Convert parsed command-line options to Options object. + + Args: + parsedOptions: Dictionary of parsed command-line options. + + Returns: + Options: Configuration options for MFD computation. + """ vtkOut = vtkOutputParsing.convert( parsedOptions ) ip = parsedOptions[ __IP ] - return Options( vtkOutput=vtkOut, ip=ip ) + permeability = parsedOptions[ __PERMEABILITY ] + return Options( vtkOutput=vtkOut, ip=ip, permeability=permeability ) def fillSubparser( subparsers: _SubParsersAction[ Any ] ) -> None: + """Fill the argument parser for the map MFD action. + Args: + subparsers: Subparsers from the main argument parser + """ p = subparsers.add_parser( "map-mfd", help="Compute MFD indicators and attach results to a VTU" ) addVtuInputFileArgument( p, required=False ) - p.add_argument( "--" + __IP, type=str, choices=("TPFA", "QTPFA", "BdLVM"), required=True ) + p.add_argument( "--" + __IP, type=str, choices=("QTPFA", "BdLVM"), required=True ) + p.add_argument( "--" + __PERMEABILITY, type=str, default="Permeability", + help="Name of the cell data array that stores permeability." ) vtkOutputParsing.fillVtkOutputSubparser( p ) def displayResults( options: Options, result: Result ) -> None: + """Display the results of the MFD computation. + Args: + options: The options used for the MFD computation. + result: The results of the MFD computation. + """ setupLogger.info( result.info ) diff --git a/mesh-doctor/tests/test_MFD.py b/mesh-doctor/tests/test_MFD.py index 0a9c927f4..d5798f31f 100644 --- a/mesh-doctor/tests/test_MFD.py +++ b/mesh-doctor/tests/test_MFD.py @@ -1,14 +1,58 @@ -from geos.mesh_doctor.actions.mapMFD import create_hex_grid, add_cell_volumes, meshAction, Options +import numpy as np +import vtk +from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk + from geos.mesh.io.vtkIO import VtkOutput +from geos.mesh_doctor.actions.mapMFD import __add_cell_volumes, meshAction, Options + +def __create_hex_grid(nx=2, ny=2, nz=2) -> vtk.vtkUnstructuredGrid: + """Creates a simple hexagonal grid with the specified number of cells in each direction.""" + mesh = vtk.vtkUnstructuredGrid() + points = vtk.vtkPoints() + for k in range(nz + 1): + for j in range(ny + 1): + for i in range(nx + 1): + points.InsertNextPoint(i / nx, j / ny, k / nz) + mesh.SetPoints(points) + + def pid(i, j, k): + return k * (ny + 1) * (nx + 1) + j * (nx + 1) + i + + for k in range(nz): + for j in range(ny): + for i in range(nx): + ids = vtk.vtkIdList() + for node in [ + pid(i, j, k), + pid(i + 1, j, k), + pid(i + 1, j + 1, k), + pid(i, j + 1, k), + pid(i, j, k + 1), + pid(i + 1, j, k + 1), + pid(i + 1, j + 1, k + 1), + pid(i, j + 1, k + 1), + ]: + ids.InsertNextId(node) + mesh.InsertNextCell(vtk.VTK_HEXAHEDRON, ids) + return __add_permeability(mesh) + +def __add_permeability(mesh, permeability_value=1.0) -> vtk.vtkUnstructuredGrid: + """Adds a simple permeability field to the mesh.""" + perm = np.ones((mesh.GetNumberOfCells(), 3)) * permeability_value + vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_UNSIGNED_INT) + vtk_perm.SetName("Permeability") + mesh.GetCellData().AddArray(vtk_perm) + return mesh def test_mfd_indicators_hex_grid(tmp_path): - mesh = create_hex_grid(1, 1, 1) - mesh = add_cell_volumes(mesh) + mesh = __create_hex_grid(1, 1, 1) + mesh = __add_cell_volumes(mesh) + mesh = __add_permeability(mesh) for ip in ["QTPFA", "BdLVM"]: out = tmp_path / f"mfd_{ip}.vtu" - options = Options(vtkOutput=VtkOutput(output=str(out), isDataModeBinary=True), ip=ip) + options = Options(vtkOutput=VtkOutput(output=str(out), isDataModeBinary=True), ip=ip, permeability="Permeability") meshAction(mesh, options) arr = mesh.GetCellData().GetArray(f"{ip}_Results") From 3091b12a14da8d0699b2cb3a79e83d3650fb4474 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Tue, 26 May 2026 15:03:24 +0200 Subject: [PATCH 3/7] type --- .../src/geos/mesh_doctor/actions/mapMFD.py | 35 ++++++++++++------- mesh-doctor/tests/test_MFD.py | 19 ++++++++-- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index de32c6fb9..88086c1f8 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -76,7 +76,11 @@ class Face: def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: - """Returns the list of faces for a given cell, where each face is represented by a list of vertex indexes.""" + """Returns the list of faces for a given cell, where each face is represented by a list of vertex indexes. + Args: + cell: The input cell for which to retrieve the faces. + Returns: + List[List[int]]: The list of faces for the given cell. Each face is represented as a list of vertex indexes.""" cell_type = cell.GetCellType() if cell_type not in _FACE_TABLE: unknown_name = vtk.vtkCellTypes.GetClassNameFromTypeId(cell_type) or str(cell_type) @@ -86,8 +90,13 @@ def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: # TODO: Refactor the ComputeMFD class to separate the computation of indicators from the mesh processing logic, and to allow for more flexible input parameters (e.g., different permeability fields, additional indicators, etc.). -def __filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: - """Filters the input mesh to retain only volume cells (3D cells) and returns the resulting mesh.""" +def filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: + """Filters the input mesh to retain only volume cells (3D cells) and returns the resulting mesh. + Args: + mesh: The input mesh to filter. + Returns: + vtk.vtkDataSet: The filtered mesh containing only volume cells (3D cells).""" + volumeIds = vtk.vtkIdTypeArray() for i in range(mesh.GetNumberOfCells()): dim = mesh.GetCell(i).GetCellDimension() @@ -130,8 +139,8 @@ def __init__(self, mesh, permeability_field: str = "Permeability"): self.faces, self.face2cell = ComputeMFD.compute_newell(mesh) self.cell_centers = ComputeMFD.__compute_cell_centroids(mesh) # make sure that we have always Volume and don't act on surfaces or lines - mesh = __filterVolumeCells(mesh) - mesh = __add_cell_volumes(mesh) + mesh = filterVolumeCells(mesh) + mesh = add_cell_volumes(mesh) self.permeability_field = permeability_field def set_IP(self, ip_type: IPType): @@ -297,10 +306,8 @@ def __get_indicators(K, M, S, compute_eigs=True): def get_eigs(Mx, Sx): lambdas = [] if compute_eigs: - try: - lambdas = np.linalg.eigvals(Mx + Sx) - except Exception: - lambdas = np.linalg.eigvals(np.linalg.pinv(Mx) @ Sx) + lambdas = np.linalg.eigvals(Mx + Sx) + return lambdas lambdas = get_eigs(M, S) @@ -310,7 +317,7 @@ def get_eigs(Mx, Sx): np.min(lambdas.real), np.max(lambdas.real), np.linalg.matrix_norm(K - K @ K), - np.linalg.matrix_norm(K @ S), + np.linalg.matrix_norm((S-Sr)), ) @staticmethod @@ -394,8 +401,12 @@ def __compute_cell_centroids(mesh) -> np.ndarray: vtkCellCenters.Update() return vtk_to_numpy(vtkCellCenters.GetOutput().GetPoints().GetData()) -def __add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: - """Computes the volumes of the cells in the input mesh and adds them as a new cell data array named 'Volume', returning the modified mesh.""" +def add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: + """Computes the volumes of the cells in the input mesh and adds them as a new cell data array named 'Volume', returning the modified mesh. + Args: + mesh (vtk.vtkUnstructuredGrid): The input mesh for which to compute and add the cell volumes. + Returns: + vtk.vtkUnstructuredGrid: The modified mesh with the computed cell volumes added as a new cell data array named 'Volume'.""" quality = vtk.vtkMeshQuality() quality.SetInputData(mesh) quality.SetHexQualityMeasureToVolume() diff --git a/mesh-doctor/tests/test_MFD.py b/mesh-doctor/tests/test_MFD.py index d5798f31f..725b62128 100644 --- a/mesh-doctor/tests/test_MFD.py +++ b/mesh-doctor/tests/test_MFD.py @@ -3,7 +3,7 @@ from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk from geos.mesh.io.vtkIO import VtkOutput -from geos.mesh_doctor.actions.mapMFD import __add_cell_volumes, meshAction, Options +from geos.mesh_doctor.actions.mapMFD import add_cell_volumes, meshAction, Options def __create_hex_grid(nx=2, ny=2, nz=2) -> vtk.vtkUnstructuredGrid: """Creates a simple hexagonal grid with the specified number of cells in each direction.""" @@ -40,14 +40,14 @@ def pid(i, j, k): def __add_permeability(mesh, permeability_value=1.0) -> vtk.vtkUnstructuredGrid: """Adds a simple permeability field to the mesh.""" perm = np.ones((mesh.GetNumberOfCells(), 3)) * permeability_value - vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_UNSIGNED_INT) + vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_DOUBLE) vtk_perm.SetName("Permeability") mesh.GetCellData().AddArray(vtk_perm) return mesh def test_mfd_indicators_hex_grid(tmp_path): mesh = __create_hex_grid(1, 1, 1) - mesh = __add_cell_volumes(mesh) + mesh = add_cell_volumes(mesh) mesh = __add_permeability(mesh) for ip in ["QTPFA", "BdLVM"]: @@ -59,3 +59,16 @@ def test_mfd_indicators_hex_grid(tmp_path): assert arr is not None assert arr.GetNumberOfTuples() == mesh.GetNumberOfCells() assert out.exists() + + if ip == "QTPFA": + varr = vtk_to_numpy(arr)[0] + assert np.isclose(varr[0], 1) #conditioning + assert np.isclose(varr[2:3], 0.5) #spectrum + assert np.isclose(varr[4], 0, atol=1e-15) #orthogonality + assert np.isclose(varr[5], 0, atol=1e-15) #consistency + elif ip == "BdLVM": + varr = vtk_to_numpy(arr)[0] + assert np.isclose(varr[0], 3) #conditioning + assert np.allclose(varr[2:4], np.array([1/6, 1/2]), atol=1e-15) #spectrum + assert np.isclose(varr[4], 0, atol=1e-15) #orthogonality + assert np.isclose(varr[5], 0, atol=1e-15) #consistency From e26e033ca28d5b4c91ffe33215f8d8fe9afd49f8 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Tue, 26 May 2026 15:34:24 +0200 Subject: [PATCH 4/7] formating --- .../src/geos/mesh_doctor/actions/mapMFD.py | 505 ++++++++++-------- .../geos/mesh_doctor/parsing/mapMFDParsing.py | 19 +- mesh-doctor/tests/test_MFD.py | 96 ++-- 3 files changed, 339 insertions(+), 281 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index 88086c1f8..7f2929d7e 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -2,71 +2,83 @@ # SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. # SPDX-FileContributor: Jacques Franc, Copilot from dataclasses import dataclass -from enum import StrEnum -from typing import Any, Dict, List, Tuple +from enum import Enum +from typing import Dict, List, Tuple, Any import numpy as np from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk import vtk +import sys from geos.mesh.io.vtkIO import writeMesh, VtkOutput, readUnstructuredGrid from geos.mesh_doctor.parsing.cliParsing import setupLogger -class IPType(StrEnum): +if sys.version_info >= ( 3, 11 ): + from enum import StrEnum +else: + + class StrEnum( str, Enum ): + """String enumeration base class for Python versions < 3.11.""" + pass + + +class IPType( StrEnum ): """Interface Pressure" type for MFD indicators.""" # TPFA = "TPFA" QTPFA = "QTPFA" BdLVM = "BdLVM" -@dataclass(frozen=True) + + +@dataclass( frozen=True ) class Face: """Represents a face of a cell in the mesh, defined by its vertex indexes, center, normal vector, and area.""" - indexes: List[int] - center: List[float] - normal: List[float] + indexes: List[ int ] + center: List[ float ] + normal: List[ float ] area: float VTK_TETRA_FACES = [ - [0, 1, 3], - [1, 2, 3], - [0, 3, 2], - [0, 2, 1], + [ 0, 1, 3 ], + [ 1, 2, 3 ], + [ 0, 3, 2 ], + [ 0, 2, 1 ], ] VTK_VOXEL_FACES = [ - [0, 2, 6, 4], - [1, 3, 7, 5], - [0, 1, 5, 4], - [2, 3, 7, 6], - [0, 1, 3, 2], - [4, 5, 7, 6], + [ 0, 2, 6, 4 ], + [ 1, 3, 7, 5 ], + [ 0, 1, 5, 4 ], + [ 2, 3, 7, 6 ], + [ 0, 1, 3, 2 ], + [ 4, 5, 7, 6 ], ] VTK_HEXAHEDRON_FACES = [ - [0, 3, 2, 1], - [4, 5, 6, 7], - [0, 1, 5, 4], - [2, 3, 7, 6], - [0, 4, 7, 3], - [1, 2, 6, 5], + [ 0, 3, 2, 1 ], + [ 4, 5, 6, 7 ], + [ 0, 1, 5, 4 ], + [ 2, 3, 7, 6 ], + [ 0, 4, 7, 3 ], + [ 1, 2, 6, 5 ], ] VTK_WEDGE_FACES = [ - [0, 1, 2], - [3, 5, 4], - [0, 3, 4, 1], - [1, 4, 5, 2], - [0, 2, 5, 3], + [ 0, 1, 2 ], + [ 3, 5, 4 ], + [ 0, 3, 4, 1 ], + [ 1, 4, 5, 2 ], + [ 0, 2, 5, 3 ], ] VTK_PYRAMID_FACES = [ - [0, 3, 2, 1], - [0, 1, 4], - [1, 2, 4], - [2, 3, 4], - [3, 0, 4], + [ 0, 3, 2, 1 ], + [ 0, 1, 4 ], + [ 1, 2, 4 ], + [ 2, 3, 4 ], + [ 3, 0, 4 ], ] -_FACE_TABLE: Dict[int, List[List[int]]] = { +_FACE_TABLE: Dict[ int, List[ List[ int ] ] ] = { vtk.VTK_TETRA: VTK_TETRA_FACES, vtk.VTK_VOXEL: VTK_VOXEL_FACES, vtk.VTK_HEXAHEDRON: VTK_HEXAHEDRON_FACES, @@ -75,96 +87,106 @@ class Face: } -def get_cell_faces(cell: vtk.vtkCell) -> List[List[int]]: +def get_cell_faces( cell: vtk.vtkCell ) -> List[ List[ int ] ]: """Returns the list of faces for a given cell, where each face is represented by a list of vertex indexes. + Args: cell: The input cell for which to retrieve the faces. + Returns: - List[List[int]]: The list of faces for the given cell. Each face is represented as a list of vertex indexes.""" + List[List[int]]: The list of faces for the given cell. Each face is represented as a list of vertex indexes. + """ cell_type = cell.GetCellType() if cell_type not in _FACE_TABLE: - unknown_name = vtk.vtkCellTypes.GetClassNameFromTypeId(cell_type) or str(cell_type) - raise ValueError(f"Unsupported cell type '{unknown_name}' (id={cell_type}).") - local_to_global = [cell.GetPointId(i) for i in range(cell.GetNumberOfPoints())] - return [[local_to_global[local_idx] for local_idx in face] for face in _FACE_TABLE[cell_type]] + unknown_name = vtk.vtkCellTypes.GetClassNameFromTypeId( cell_type ) or str( cell_type ) + raise ValueError( f"Unsupported cell type '{unknown_name}' (id={cell_type})." ) + local_to_global = [ cell.GetPointId( i ) for i in range( cell.GetNumberOfPoints() ) ] + return [ [ local_to_global[ local_idx ] for local_idx in face ] for face in _FACE_TABLE[ cell_type ] ] # TODO: Refactor the ComputeMFD class to separate the computation of indicators from the mesh processing logic, and to allow for more flexible input parameters (e.g., different permeability fields, additional indicators, etc.). -def filterVolumeCells(mesh: vtk.vtkDataSet) -> vtk.vtkDataSet: +def filterVolumeCells( mesh: vtk.vtkDataSet ) -> vtk.vtkDataSet: """Filters the input mesh to retain only volume cells (3D cells) and returns the resulting mesh. + Args: mesh: The input mesh to filter. - Returns: - vtk.vtkDataSet: The filtered mesh containing only volume cells (3D cells).""" + Returns: + vtk.vtkDataSet: The filtered mesh containing only volume cells (3D cells). + """ volumeIds = vtk.vtkIdTypeArray() - for i in range(mesh.GetNumberOfCells()): - dim = mesh.GetCell(i).GetCellDimension() + for i in range( mesh.GetNumberOfCells() ): + dim = mesh.GetCell( i ).GetCellDimension() if dim == 3: - volumeIds.InsertNextValue(i) + volumeIds.InsertNextValue( i ) sn = vtk.vtkSelectionNode() - sn.SetFieldType(vtk.vtkSelectionNode.CELL) - sn.SetContentType(vtk.vtkSelectionNode.INDICES) - sn.SetSelectionList(volumeIds) + sn.SetFieldType( vtk.vtkSelectionNode.CELL ) + sn.SetContentType( vtk.vtkSelectionNode.INDICES ) + sn.SetSelectionList( volumeIds ) sel = vtk.vtkSelection() - sel.AddNode(sn) + sel.AddNode( sn ) ext = vtk.vtkExtractSelection() - ext.SetInputData(0, mesh) - ext.SetInputData(1, sel) + ext.SetInputData( 0, mesh ) + ext.SetInputData( 1, sel ) ext.Update() return ext.GetOutput() -def __attach_results(mesh: vtk.vtkDataSet, matrices: List[Tuple[float, float, float, float, float, float]], field_name: str = "MatrixField") -> vtk.vtkDataSet: + +def __attach_results( mesh: vtk.vtkDataSet, + matrices: List[ Tuple[ float, float, float, float, float, float ] ], + field_name: str = "MatrixField" ) -> vtk.vtkDataSet: """Attaches the computed MFD indicators to the mesh as a new cell data array with the specified field name.""" - flat = np.array([np.asarray(m, dtype=np.float64) for m in matrices]) - vtk_arr = numpy_to_vtk(flat, deep=True, array_type=vtk.VTK_DOUBLE) - vtk_arr.SetName(field_name) - vtk_arr.SetNumberOfComponents(6) - vtk_arr.SetComponentName(0, "condM") - vtk_arr.SetComponentName(1, "consistency") - vtk_arr.SetComponentName(2, "lambda_m") - vtk_arr.SetComponentName(3, "lambda_M") - vtk_arr.SetComponentName(4, "idempotence") - vtk_arr.SetComponentName(5, "orthogonality") - mesh.GetCellData().AddArray(vtk_arr) + flat = np.array( [ np.asarray( m, dtype=np.float64 ) for m in matrices ] ) + vtk_arr = numpy_to_vtk( flat, deep=True, array_type=vtk.VTK_DOUBLE ) + vtk_arr.SetName( field_name ) + vtk_arr.SetNumberOfComponents( 6 ) + vtk_arr.SetComponentName( 0, "condM" ) + vtk_arr.SetComponentName( 1, "consistency" ) + vtk_arr.SetComponentName( 2, "lambda_m" ) + vtk_arr.SetComponentName( 3, "lambda_M" ) + vtk_arr.SetComponentName( 4, "idempotence" ) + vtk_arr.SetComponentName( 5, "orthogonality" ) + mesh.GetCellData().AddArray( vtk_arr ) return mesh + class ComputeMFD: """Class responsible for computing MFD indicators for a given mesh and permeability field, based on the specified interface pressure type (TPFA, QTPFA, or BdLVM).""" - def __init__(self, mesh, permeability_field: str = "Permeability"): + + def __init__( self, mesh: vtk.vtkDataSet, permeability_field: str = "Permeability" ) -> None: """Initializes the ComputeMFD instance by computing the faces, cell centers, and permeability field from the input mesh.""" - self.faces, self.face2cell = ComputeMFD.compute_newell(mesh) - self.cell_centers = ComputeMFD.__compute_cell_centroids(mesh) + self.faces, self.face2cell = ComputeMFD.compute_newell( mesh ) + self.cell_centers = ComputeMFD.__compute_cell_centroids( mesh ) # make sure that we have always Volume and don't act on surfaces or lines - mesh = filterVolumeCells(mesh) - mesh = add_cell_volumes(mesh) + mesh = filterVolumeCells( mesh ) + mesh = add_cell_volumes( mesh ) self.permeability_field = permeability_field - def set_IP(self, ip_type: IPType): + def set_IP( self, ip_type: IPType ) -> None: """Sets the interface pressure type for the MFD computation. - - Args: + + Args: ip_type (IPType): The interface pressure type to use for the MFD computation (TPFA, QTPFA, or BdLVM). """ self.ip_type = ip_type - def compute(self, mesh): + def compute( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float, float, float, float, float ] ]: """Computes the MFD indicators for the input mesh based on the specified interface pressure type and returns the results as a list of tuples containing the indicators for each cell. - - Args: - mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators. + + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators. """ # if self.ip_type == IPType.TPFA: - # return self.compute_tpfa(mesh) + # return self.compute_tpfa(mesh) if self.ip_type == IPType.QTPFA: - return self.compute_quasitpfa(mesh) + return self.compute_quasitpfa( mesh ) elif self.ip_type == IPType.BdLVM: - return self.compute_bdlvm(mesh) + return self.compute_bdlvm( mesh ) else: - raise ValueError(f"Unsupported IP type: {self.ip_type}") + raise ValueError( f"Unsupported IP type: {self.ip_type}" ) # def compute_tpfa(self, mesh) -> list[np.ndarray]: # """Computes the MFD indicators using the TPFA method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. @@ -206,222 +228,243 @@ def compute(self, mesh): # M[cell] = Mloc # return M - def compute_quasitpfa(self, mesh): + def compute_quasitpfa( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float, float, float, float, float ] ]: """Computes the MFD indicators using the QTPFA method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. - Args: - mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the QTPFA method. + + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the QTPFA method. """ - perm = vtk_to_numpy(mesh.GetCellData().GetArray(self.permeability_field)) + perm = vtk_to_numpy( mesh.GetCellData().GetArray( self.permeability_field ) ) invperm = 1.0 / perm - vol = vtk_to_numpy(mesh.GetCellData().GetArray("Volume")) - faces, self.face2cell = ComputeMFD.compute_newell(mesh) - cell2face = {} - [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] - cell_centers = ComputeMFD.__compute_cell_centroids(mesh) - ncells = len(cell_centers) - M = [None] * ncells - face_centers = np.array([faces[k].center for k in self.face2cell]) - face_normals = np.array([faces[k].normal for k in self.face2cell]) - face_area = np.array([faces[k].area for k in self.face2cell]) - - def process_cell(cell): - face_indices = cell2face.get(cell) - nf = len(face_indices) - fc_vec = face_centers[face_indices] - cell_centers[cell] + vol = vtk_to_numpy( mesh.GetCellData().GetArray( "Volume" ) ) + faces, self.face2cell = ComputeMFD.compute_newell( mesh ) + cell2face: Dict[ int, List[ int ] ] = {} + + for k, v in self.face2cell.items(): + for cell in v: + cell2face.setdefault( cell, [] ).append( k ) + + cell_centers = ComputeMFD.__compute_cell_centroids( mesh ) + ncells = len( cell_centers ) + M = [ ( 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 ) ] * ncells + face_centers = np.array( [ faces[ k ].center for k in self.face2cell ] ) + face_normals = np.array( [ faces[ k ].normal for k in self.face2cell ] ) + face_area = np.array( [ faces[ k ].area for k in self.face2cell ] ) + + def process_cell( cell: int ) -> Tuple[ int, float, float, float, float, float, float ]: + face_indices = cell2face.get( cell, [] ) + nf = len( face_indices ) + fc_vec = face_centers[ face_indices ] - cell_centers[ cell ] c = fc_vec - loc_normals = ComputeMFD.__reorient_normal(face_normals[face_indices], c) - n = loc_normals * face_area[face_indices, None] - K = np.tile(perm[cell], (nf, 1)) - Kinv = np.tile(invperm[cell], (nf, 1)) - Mloc = np.einsum('ni,ni,mi->nm', c, Kinv, c) / vol[cell] - w = np.einsum('ni,ni,ni->n', n, K, n) - Q, _ = np.linalg.qr(n, mode='reduced') - proj = np.eye(nf) - np.einsum('ni,mi->nm', Q, Q) - S = np.einsum('ij,j,jk->ik', proj, w, proj) - S = (vol[cell] / 2.0) * S - indicators = ComputeMFD.__get_indicators(proj, Mloc, S) - return (cell, *indicators) + loc_normals = ComputeMFD.__reorient_normal( face_normals[ face_indices ], c ) + n = loc_normals * face_area[ face_indices, None ] + K = np.tile( perm[ cell ], ( nf, 1 ) ) + Kinv = np.tile( invperm[ cell ], ( nf, 1 ) ) + Mloc = np.einsum( 'ni,ni,mi->nm', c, Kinv, c ) / vol[ cell ] + w = np.einsum( 'ni,ni,ni->n', n, K, n ) + Q, _ = np.linalg.qr( n, mode='reduced' ) + proj = np.eye( nf ) - np.einsum( 'ni,mi->nm', Q, Q ) + S = np.einsum( 'ij,j,jk->ik', proj, w, proj ) + S = ( vol[ cell ] / 2.0 ) * S + indicators = ComputeMFD.__get_indicators( proj, Mloc, S ) + return ( cell, *indicators ) from concurrent.futures import ThreadPoolExecutor, as_completed - total = len(cell2face) - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_cell, i) for i in range(total)] - results = [future.result() for future in as_completed(futures)] + total = len( cell2face ) + with ThreadPoolExecutor( max_workers=8 ) as executor: + futures = [ executor.submit( process_cell, i ) for i in range( total ) ] + results = [ future.result() for future in as_completed( futures ) ] for cell, c, cr, lm, lM, idem, ortho in results: - M[cell] = (c, cr, lm, lM, idem, ortho) + M[ cell ] = ( c, cr, lm, lM, idem, ortho ) return M - def compute_bdlvm(self, mesh): + def compute_bdlvm( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float, float, float, float, float ] ]: """Computes the MFD indicators using the BDLVM method for the input mesh and returns the results as a list of tuples containing the indicators for each cell. - Args: - mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the BDLVM method. + + Args: + mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators using the BDLVM method. """ - perm = vtk_to_numpy(mesh.GetCellData().GetArray(self.permeability_field)) - cell2face = {} - [cell2face.setdefault(cell, []).append(k) for k, v in self.face2cell.items() for cell in v] - ncells = len(self.cell_centers) - M = [None] * ncells - face_centers = np.array([self.faces[k].center for k in self.face2cell]) - face_normals = np.array([self.faces[k].normal for k in self.face2cell]) - face_area = np.array([self.faces[k].area for k in self.face2cell]) - - def process_cell(cell): - face_indices = cell2face.get(cell) - nf = len(face_indices) + perm = vtk_to_numpy( mesh.GetCellData().GetArray( self.permeability_field ) ) + cell2face: Dict[ int, List[ int ] ] = {} + + for k, v in self.face2cell.items(): + for cell in v: + cell2face.setdefault( cell, [] ).append( k ) + + ncells = len( self.cell_centers ) + M = [ ( 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 ) ] * ncells + face_centers = np.array( [ self.faces[ k ].center for k in self.face2cell ] ) + face_normals = np.array( [ self.faces[ k ].normal for k in self.face2cell ] ) + face_area = np.array( [ self.faces[ k ].area for k in self.face2cell ] ) + + def process_cell( cell: int ) -> Tuple[ int, float, float, float, float, float, float ]: + face_indices = cell2face.get( cell, [] ) + nf = len( face_indices ) gamma = 1.0 / nf - fc_vec = face_centers[face_indices] - self.cell_centers[cell] - area = face_area[face_indices] - c = np.einsum('ni,n->ni', fc_vec, area) - n = face_normals[face_indices] - K = np.tile(perm[cell], (nf, 1)) - n = np.einsum('ni,ni->ni', n, K) - CtN_inv = np.linalg.pinv(c.T @ n) - M0 = np.einsum('ni,ij,mj->nm', c, CtN_inv, c) - NtN_inv = np.linalg.pinv(n.T @ n) - proj = np.eye(nf) - np.einsum('ni,ij,mj->nm', n, NtN_inv, n) + fc_vec = face_centers[ face_indices ] - self.cell_centers[ cell ] + area = face_area[ face_indices ] + c = np.einsum( 'ni,n->ni', fc_vec, area ) + n = face_normals[ face_indices ] + K = np.tile( perm[ cell ], ( nf, 1 ) ) + n = np.einsum( 'ni,ni->ni', n, K ) + CtN_inv = np.linalg.pinv( c.T @ n ) + M0 = np.einsum( 'ni,ij,mj->nm', c, CtN_inv, c ) + NtN_inv = np.linalg.pinv( n.T @ n ) + proj = np.eye( nf ) - np.einsum( 'ni,ij,mj->nm', n, NtN_inv, n ) Mloc = M0 S = gamma * proj - Mloc = np.einsum('ij,i,j->ij', Mloc, 1.0 / area, 1.0 / area) - S = np.einsum('ij,i,j->ij', S, 1.0 / area, 1.0 / area) - indicators = ComputeMFD.__get_indicators(proj, Mloc, S) - return (cell, *indicators) + Mloc = np.einsum( 'ij,i,j->ij', Mloc, 1.0 / area, 1.0 / area ) + S = np.einsum( 'ij,i,j->ij', S, 1.0 / area, 1.0 / area ) + indicators = ComputeMFD.__get_indicators( proj, Mloc, S ) + return ( cell, *indicators ) from concurrent.futures import ThreadPoolExecutor, as_completed - total = len(cell2face) - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_cell, i) for i in range(total)] - results = [future.result() for future in as_completed(futures)] + total = len( cell2face ) + with ThreadPoolExecutor( max_workers=8 ) as executor: + futures = [ executor.submit( process_cell, i ) for i in range( total ) ] + results = [ future.result() for future in as_completed( futures ) ] for cell, c, cr, lm, lM, idem, ortho in results: - M[cell] = (c, cr, lm, lM, idem, ortho) + M[ cell ] = ( c, cr, lm, lM, idem, ortho ) return M @staticmethod - def __get_indicators(K, M, S, compute_eigs=True): + def __get_indicators( K: np.ndarray, + M: np.ndarray, + S: np.ndarray, + compute_eigs: bool = True ) -> Tuple[ Any, Any, float, float, Any, Any ]: """Computes the MFD indicators (condition number, consistency, eigenvalues, idempotence, and orthogonality) based on the local MFD matrices K, M, and S for a given cell.""" Sr = K.T @ S @ K - Mr = K.T @ M @ K + K.T @ M @ K - def get_eigs(Mx, Sx): - lambdas = [] + def get_eigs( Mx: np.ndarray, Sx: np.ndarray ) -> np.ndarray: + lambdas: np.ndarray = np.array( [] ) if compute_eigs: - lambdas = np.linalg.eigvals(Mx + Sx) + lambdas, _ = np.linalg.eigvals( Mx + Sx ) return lambdas - lambdas = get_eigs(M, S) + lambdas = get_eigs( M, S ) return ( - np.linalg.cond(M + S), - np.linalg.matrix_norm(Sr), - np.min(lambdas.real), - np.max(lambdas.real), - np.linalg.matrix_norm(K - K @ K), - np.linalg.matrix_norm((S-Sr)), + np.linalg.cond( M + S ), + np.linalg.matrix_norm( Sr ), + np.min( lambdas.real ), + np.max( lambdas.real ), + np.linalg.matrix_norm( K - K @ K ), + np.linalg.matrix_norm( ( S - Sr ) ), ) @staticmethod - def centroid_3d_polygon(mesh, point_indices: List[int], area_tolerance: float = 0.0): + def centroid_3d_polygon( mesh: vtk.vtkDataSet, + point_indices: List[ int ], + area_tolerance: float = 0.0 ) -> Tuple[ np.ndarray, np.ndarray, float ]: """Computes the centroid, normal vector, and area of a 3D polygon defined by the given point indices in the input mesh, using Newell's method.""" - n = len(point_indices) + n = len( point_indices ) if n < 2: - raise ValueError(f"Polygon must have at least 2 points, got {n}.") - points = vtk_to_numpy(mesh.GetPoints().GetData()) - origin = points[point_indices[0]].copy() - normal = np.zeros(3) - center = np.zeros(3) - for a in range(n): - current = points[point_indices[a]] - origin - next_pt = points[point_indices[(a + 1) % n]] - origin - cross = np.cross(current, next_pt) + raise ValueError( f"Polygon must have at least 2 points, got {n}." ) + points = vtk_to_numpy( mesh.GetPoints().GetData() ) + origin = points[ point_indices[ 0 ] ].copy() + normal = np.zeros( 3 ) + center = np.zeros( 3 ) + for a in range( n ): + current = points[ point_indices[ a ] ] - origin + next_pt = points[ point_indices[ ( a + 1 ) % n ] ] - origin + cross = np.cross( current, next_pt ) normal += cross center += next_pt - area = np.linalg.norm(normal) + area = np.linalg.norm( normal ) center = center / n + origin if area > area_tolerance: normal /= area area *= 0.5 elif area < -area_tolerance: - raise ValueError("Negative area found") + raise ValueError( "Negative area found" ) else: return center, normal, 0.0 return center, normal, area @staticmethod - def compute_newell(mesh): + def compute_newell( mesh: vtk.vtkDataSet ) -> Tuple[ Dict[ int, Face ], Dict[ int, List[ int ] ] ]: """Computes the faces of the mesh using Newell's method and returns a dictionary of faces and a mapping from face indices to cell indices. + Args: - mesh (vtk.vtkDataSet): The input mesh for which to compute the faces using Newell's method.""" - faces = {} - face2cell = {} - - def process_cell(cellid): - local_faces = [] - local_map = [] - list_indexes = get_cell_faces(mesh.GetCell(cellid)) + mesh (vtk.vtkDataSet): The input mesh for which to compute the faces using Newell's method. + """ + faces: Dict[ int, Face ] = {} + face2cell: Dict[ int, List[ int ] ] = {} + + def process_cell( cellid: int ) -> Tuple[ List[ Tuple ], List[ Tuple ] ]: + local_faces: List[ Tuple ] = [] + local_map: List[ Tuple ] = [] + list_indexes = get_cell_faces( mesh.GetCell( cellid ) ) for indexes in list_indexes: - center, normal, area = ComputeMFD.centroid_3d_polygon(mesh, indexes) - key = tuple(sorted(indexes)) - local_faces.append((key, indexes, center, normal, area)) - local_map.append((key, cellid)) + center, normal, area = ComputeMFD.centroid_3d_polygon( mesh, indexes ) + key = tuple( sorted( indexes ) ) + local_faces.append( ( key, indexes, center, normal, area ) ) + local_map.append( ( key, cellid ) ) return local_faces, local_map next_index = 0 from concurrent.futures import ThreadPoolExecutor, as_completed total = mesh.GetNumberOfCells() - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [executor.submit(process_cell, i) for i in range(total)] - results = [future.result() for future in as_completed(futures)] + with ThreadPoolExecutor( max_workers=8 ) as executor: + futures = [ executor.submit( process_cell, i ) for i in range( total ) ] + results = [ future.result() for future in as_completed( futures ) ] face_lookup = {} for local_faces, local_map in results: for key, indexes, center, normal, area in local_faces: if key not in face_lookup: - face_lookup[key] = next_index - faces[next_index] = Face(indexes=indexes, center=center, normal=normal, area=area) + face_lookup[ key ] = next_index + faces[ next_index ] = Face( indexes=indexes, center=center, normal=normal, area=area ) next_index += 1 for key, cellid in local_map: - face_index = face_lookup[key] - face2cell.setdefault(face_index, []).append(cellid) + face_index = face_lookup[ key ] + face2cell.setdefault( face_index, [] ).append( cellid ) return faces, face2cell @staticmethod - def __reorient_normal(normals, cell2vec): + def __reorient_normal( normals: np.ndarray, cell2vec: np.ndarray ) -> np.ndarray: """Reorients the normal vectors of the faces based on the direction of the vector from the cell center to the face center, ensuring that the normals point outward from the cell.""" - flag = np.einsum('ni,ni->n', normals, cell2vec) < 0 - normals[flag] = -normals[flag] + flag = np.einsum( 'ni,ni->n', normals, cell2vec ) < 0 + normals[ flag ] = -normals[ flag ] return normals @staticmethod - def __compute_cell_centroids(mesh) -> np.ndarray: + def __compute_cell_centroids( mesh: vtk.vtkDataSet ) -> np.ndarray: """Computes the centroids of the cells in the input mesh and returns them as a NumPy array.""" vtkCellCenters = vtk.vtkCellCenters() - vtkCellCenters.SetInputData(mesh) + vtkCellCenters.SetInputData( mesh ) vtkCellCenters.Update() - return vtk_to_numpy(vtkCellCenters.GetOutput().GetPoints().GetData()) + return vtk_to_numpy( vtkCellCenters.GetOutput().GetPoints().GetData() ) + -def add_cell_volumes(mesh: vtk.vtkUnstructuredGrid) -> vtk.vtkUnstructuredGrid: +def add_cell_volumes( mesh: vtk.vtkUnstructuredGrid ) -> vtk.vtkUnstructuredGrid: """Computes the volumes of the cells in the input mesh and adds them as a new cell data array named 'Volume', returning the modified mesh. + Args: - mesh (vtk.vtkUnstructuredGrid): The input mesh for which to compute and add the cell volumes. + mesh (vtk.vtkUnstructuredGrid): The input mesh for which to compute and add the cell volumes. + Returns: - vtk.vtkUnstructuredGrid: The modified mesh with the computed cell volumes added as a new cell data array named 'Volume'.""" + vtk.vtkUnstructuredGrid: The modified mesh with the computed cell volumes added as a new cell data array named 'Volume'. + """ quality = vtk.vtkMeshQuality() - quality.SetInputData(mesh) + quality.SetInputData( mesh ) quality.SetHexQualityMeasureToVolume() quality.SetTetQualityMeasureToVolume() quality.SetWedgeQualityMeasureToVolume() quality.SetPyramidQualityMeasureToVolume() quality.Update() - quality_array = vtk_to_numpy(quality.GetOutput().GetCellData().GetArray("Quality")) - volume_array = numpy_to_vtk(quality_array, deep=True) - volume_array.SetName("Volume") - mesh.GetCellData().AddArray(volume_array) + quality_array = vtk_to_numpy( quality.GetOutput().GetCellData().GetArray( "Quality" ) ) + volume_array = numpy_to_vtk( quality_array, deep=True ) + volume_array.SetName( "Volume" ) + mesh.GetCellData().AddArray( volume_array ) return mesh -@dataclass(frozen=True) +@dataclass( frozen=True ) class Options: """Configuration options for MFD computation, including the output settings, interface pressure type, and permeability field name.""" vtkOutput: VtkOutput @@ -429,38 +472,44 @@ class Options: permeability: str = "Permeability" -@dataclass(frozen=True) +@dataclass( frozen=True ) class Result: """Result of the MFD computation.""" info: str -def meshAction(mesh, options: Options) -> Result: +def meshAction( mesh: vtk.vtkDataSet, options: Options ) -> Result: """Performs the MFD computation on the input mesh using the specified options and returns the results as a Result object containing information about the computation. + Args: mesh (vtk.vtkDataSet): The input mesh for which to compute the MFD indicators. options (Options): The configuration options for the MFD computation, including the output settings, interface pressure type, and permeability field name. + Returns: - Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation.""" - mfd = ComputeMFD(mesh, permeability_field=options.permeability) + Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation. + """ + mfd = ComputeMFD( mesh, permeability_field=options.permeability ) try: - mfd.set_IP(IPType[options.ip]) - except Exception: - raise ValueError(f"Unsupported IP type: {options.ip}") + mfd.set_IP( IPType[ options.ip ] ) + except Exception as e: + raise ValueError( f"Unsupported IP type: {options.ip}" ) from e - res = mfd.compute(mesh) - mesh = __attach_results(mesh, res, f"{options.ip}_Results") + res = mfd.compute( mesh ) + mesh = __attach_results( mesh, res, f"{options.ip}_Results" ) - writeMesh(mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger) - return Result(info=f"MFD {options.ip} computed and written to {options.vtkOutput.output}") + writeMesh( mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger ) + return Result( info=f"MFD {options.ip} computed and written to {options.vtkOutput.output}" ) -def action(vtuInputFile: str, options: Options) -> Result: +def action( vtuInputFile: str, options: Options ) -> Result: """Main entry point for the MFD computation action, which reads the input mesh from the specified VTU file, performs the MFD computation using the provided options, and returns the results as a Result object containing information about the computation. - Args: - vtuInputFile (str): The file path to the input VTU mesh for which to compute the MFD indicators. - options (Options): The configuration options for the MFD computation, including the output settings, interface pressure type, and permeability field name. + + Args: + vtuInputFile (str): The file path to the input VTU mesh for which to compute the MFD indicators. + options (Options): The configuration options for the MFD computation, including the output settings, interface pressure type, and permeability field name. + Returns: - Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation.""" - mesh = readUnstructuredGrid(vtuInputFile) - return meshAction(mesh, options) + Result: An object containing information about the MFD computation, including the output file path and the interface pressure type used for the computation. + """ + mesh = readUnstructuredGrid( vtuInputFile ) + return meshAction( mesh, options ) diff --git a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py index 3f52833b5..754a1d191 100644 --- a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py +++ b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py @@ -9,18 +9,17 @@ from geos.mesh_doctor.parsing import vtkOutputParsing from geos.mesh_doctor.actions.mapMFD import Options, Result - __IP = "ip" __PERMEABILITY = "permeability" def convert( parsedOptions: dict[ str, Any ] ) -> Options: """Convert parsed command-line options to Options object. - - Args: + + Args: parsedOptions: Dictionary of parsed command-line options. - - Returns: + + Returns: Options: Configuration options for MFD computation. """ vtkOut = vtkOutputParsing.convert( parsedOptions ) @@ -31,19 +30,23 @@ def convert( parsedOptions: dict[ str, Any ] ) -> Options: def fillSubparser( subparsers: _SubParsersAction[ Any ] ) -> None: """Fill the argument parser for the map MFD action. + Args: - subparsers: Subparsers from the main argument parser + subparsers: Subparsers from the main argument parser. """ p = subparsers.add_parser( "map-mfd", help="Compute MFD indicators and attach results to a VTU" ) addVtuInputFileArgument( p, required=False ) - p.add_argument( "--" + __IP, type=str, choices=("QTPFA", "BdLVM"), required=True ) - p.add_argument( "--" + __PERMEABILITY, type=str, default="Permeability", + p.add_argument( "--" + __IP, type=str, choices=( "QTPFA", "BdLVM" ), required=True ) + p.add_argument( "--" + __PERMEABILITY, + type=str, + default="Permeability", help="Name of the cell data array that stores permeability." ) vtkOutputParsing.fillVtkOutputSubparser( p ) def displayResults( options: Options, result: Result ) -> None: """Display the results of the MFD computation. + Args: options: The options used for the MFD computation. result: The results of the MFD computation. diff --git a/mesh-doctor/tests/test_MFD.py b/mesh-doctor/tests/test_MFD.py index 725b62128..44d24393b 100644 --- a/mesh-doctor/tests/test_MFD.py +++ b/mesh-doctor/tests/test_MFD.py @@ -1,74 +1,80 @@ import numpy as np import vtk +import pathlib from vtkmodules.util.numpy_support import vtk_to_numpy, numpy_to_vtk from geos.mesh.io.vtkIO import VtkOutput from geos.mesh_doctor.actions.mapMFD import add_cell_volumes, meshAction, Options -def __create_hex_grid(nx=2, ny=2, nz=2) -> vtk.vtkUnstructuredGrid: + +def __create_hex_grid( nx: int = 2, ny: int = 2, nz: int = 2 ) -> vtk.vtkUnstructuredGrid: """Creates a simple hexagonal grid with the specified number of cells in each direction.""" mesh = vtk.vtkUnstructuredGrid() points = vtk.vtkPoints() - for k in range(nz + 1): - for j in range(ny + 1): - for i in range(nx + 1): - points.InsertNextPoint(i / nx, j / ny, k / nz) - mesh.SetPoints(points) + for k in range( nz + 1 ): + for j in range( ny + 1 ): + for i in range( nx + 1 ): + points.InsertNextPoint( i / nx, j / ny, k / nz ) + mesh.SetPoints( points ) - def pid(i, j, k): - return k * (ny + 1) * (nx + 1) + j * (nx + 1) + i + def pid( i: int, j: int, k: int ) -> int: + return k * ( ny + 1 ) * ( nx + 1 ) + j * ( nx + 1 ) + i - for k in range(nz): - for j in range(ny): - for i in range(nx): + for k in range( nz ): + for j in range( ny ): + for i in range( nx ): ids = vtk.vtkIdList() for node in [ - pid(i, j, k), - pid(i + 1, j, k), - pid(i + 1, j + 1, k), - pid(i, j + 1, k), - pid(i, j, k + 1), - pid(i + 1, j, k + 1), - pid(i + 1, j + 1, k + 1), - pid(i, j + 1, k + 1), + pid( i, j, k ), + pid( i + 1, j, k ), + pid( i + 1, j + 1, k ), + pid( i, j + 1, k ), + pid( i, j, k + 1 ), + pid( i + 1, j, k + 1 ), + pid( i + 1, j + 1, k + 1 ), + pid( i, j + 1, k + 1 ), ]: - ids.InsertNextId(node) - mesh.InsertNextCell(vtk.VTK_HEXAHEDRON, ids) - return __add_permeability(mesh) + ids.InsertNextId( node ) + mesh.InsertNextCell( vtk.VTK_HEXAHEDRON, ids ) + return __add_permeability( mesh ) -def __add_permeability(mesh, permeability_value=1.0) -> vtk.vtkUnstructuredGrid: +def __add_permeability( mesh: vtk.vtkUnstructuredGrid, permeability_value: float = 1.0 ) -> vtk.vtkUnstructuredGrid: """Adds a simple permeability field to the mesh.""" - perm = np.ones((mesh.GetNumberOfCells(), 3)) * permeability_value - vtk_perm = numpy_to_vtk(perm, array_type=vtk.VTK_DOUBLE) - vtk_perm.SetName("Permeability") - mesh.GetCellData().AddArray(vtk_perm) + perm = np.ones( ( mesh.GetNumberOfCells(), 3 ) ) * permeability_value + vtk_perm = numpy_to_vtk( perm, array_type=vtk.VTK_DOUBLE ) + vtk_perm.SetName( "Permeability" ) + mesh.GetCellData().AddArray( vtk_perm ) return mesh -def test_mfd_indicators_hex_grid(tmp_path): - mesh = __create_hex_grid(1, 1, 1) - mesh = add_cell_volumes(mesh) - mesh = __add_permeability(mesh) - for ip in ["QTPFA", "BdLVM"]: +def test_mfd_indicators_hex_grid( tmp_path: pathlib.Path ) -> None: + """Test the MFD indicators on a simple hexagonal grid with known properties.""" + mesh = __create_hex_grid( 1, 1, 1 ) + mesh = add_cell_volumes( mesh ) + mesh = __add_permeability( mesh ) + + for ip in [ "QTPFA", "BdLVM" ]: out = tmp_path / f"mfd_{ip}.vtu" - options = Options(vtkOutput=VtkOutput(output=str(out), isDataModeBinary=True), ip=ip, permeability="Permeability") - meshAction(mesh, options) + options = Options( vtkOutput=VtkOutput( output=str( out ), isDataModeBinary=True ), + ip=ip, + permeability="Permeability" ) + meshAction( mesh, options ) - arr = mesh.GetCellData().GetArray(f"{ip}_Results") + arr = mesh.GetCellData().GetArray( f"{ip}_Results" ) assert arr is not None assert arr.GetNumberOfTuples() == mesh.GetNumberOfCells() assert out.exists() if ip == "QTPFA": - varr = vtk_to_numpy(arr)[0] - assert np.isclose(varr[0], 1) #conditioning - assert np.isclose(varr[2:3], 0.5) #spectrum - assert np.isclose(varr[4], 0, atol=1e-15) #orthogonality - assert np.isclose(varr[5], 0, atol=1e-15) #consistency + varr = vtk_to_numpy( arr )[ 0 ] + assert np.isclose( varr[ 0 ], 1 ) #conditioning + assert np.isclose( varr[ 2:3 ], 0.5 ) #spectrum + assert np.isclose( varr[ 4 ], 0, atol=1e-15 ) #orthogonality + assert np.isclose( varr[ 5 ], 0, atol=1e-15 ) #consistency elif ip == "BdLVM": - varr = vtk_to_numpy(arr)[0] - assert np.isclose(varr[0], 3) #conditioning - assert np.allclose(varr[2:4], np.array([1/6, 1/2]), atol=1e-15) #spectrum - assert np.isclose(varr[4], 0, atol=1e-15) #orthogonality - assert np.isclose(varr[5], 0, atol=1e-15) #consistency + varr = vtk_to_numpy( arr )[ 0 ] + assert np.isclose( varr[ 0 ], 3 ) #conditioning + assert np.allclose( varr[ 2:4 ], np.array( [ 1 / 6, 1 / 2 ] ), atol=1e-15 ) #spectrum + assert np.isclose( varr[ 4 ], 0, atol=1e-15 ) #orthogonality + assert np.isclose( varr[ 5 ], 0, atol=1e-15 ) #consistency From 2eeb4af36b99ae6b69f37a2808a1257996200518 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Tue, 26 May 2026 15:47:21 +0200 Subject: [PATCH 5/7] connect to main interface --- mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py | 1 - mesh-doctor/src/geos/mesh_doctor/parsing/__init__.py | 1 + mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py | 3 ++- mesh-doctor/src/geos/mesh_doctor/register.py | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index 7f2929d7e..ab56f5b7b 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -15,7 +15,6 @@ if sys.version_info >= ( 3, 11 ): from enum import StrEnum else: - class StrEnum( str, Enum ): """String enumeration base class for Python versions < 3.11.""" pass diff --git a/mesh-doctor/src/geos/mesh_doctor/parsing/__init__.py b/mesh-doctor/src/geos/mesh_doctor/parsing/__init__.py index ecdb80d9c..466438346 100644 --- a/mesh-doctor/src/geos/mesh_doctor/parsing/__init__.py +++ b/mesh-doctor/src/geos/mesh_doctor/parsing/__init__.py @@ -19,6 +19,7 @@ ORPHAN_2D = "orphan2d" CHECK_INTERNAL_TAGS = "checkInternalTags" EULER = "euler" +MAPMFD = "mapMFD" @dataclass( frozen=True ) diff --git a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py index 754a1d191..385309512 100644 --- a/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py +++ b/mesh-doctor/src/geos/mesh_doctor/parsing/mapMFDParsing.py @@ -5,6 +5,7 @@ from argparse import _SubParsersAction from typing import Any +from geos.mesh_doctor.parsing import MAPMFD from geos.mesh_doctor.parsing.cliParsing import setupLogger, addVtuInputFileArgument from geos.mesh_doctor.parsing import vtkOutputParsing from geos.mesh_doctor.actions.mapMFD import Options, Result @@ -34,7 +35,7 @@ def fillSubparser( subparsers: _SubParsersAction[ Any ] ) -> None: Args: subparsers: Subparsers from the main argument parser. """ - p = subparsers.add_parser( "map-mfd", help="Compute MFD indicators and attach results to a VTU" ) + p = subparsers.add_parser( MAPMFD, help="Compute MFD indicators and attach results to a VTU" ) addVtuInputFileArgument( p, required=False ) p.add_argument( "--" + __IP, type=str, choices=( "QTPFA", "BdLVM" ), required=True ) p.add_argument( "--" + __PERMEABILITY, diff --git a/mesh-doctor/src/geos/mesh_doctor/register.py b/mesh-doctor/src/geos/mesh_doctor/register.py index 99675376c..e33dc88c9 100644 --- a/mesh-doctor/src/geos/mesh_doctor/register.py +++ b/mesh-doctor/src/geos/mesh_doctor/register.py @@ -59,7 +59,7 @@ def registerParsingActions( parsing.FIX_ELEMENTS_ORDERINGS, parsing.GENERATE_CUBE, parsing.GENERATE_FRACTURES, parsing.GENERATE_GLOBAL_IDS, parsing.MAIN_CHECKS, parsing.NON_CONFORMAL, parsing.SELF_INTERSECTING_ELEMENTS, parsing.SUPPORTED_ELEMENTS, parsing.ORPHAN_2D, - parsing.CHECK_INTERNAL_TAGS, parsing.EULER ): + parsing.CHECK_INTERNAL_TAGS, parsing.EULER, parsing.MAPMFD ): __HELPERS[ actionName ] = actionName __ACTIONS[ actionName ] = actionName From fc384077d3cdbd634a372bc6f6c6a54629d2ca80 Mon Sep 17 00:00:00 2001 From: jacques franc Date: Wed, 27 May 2026 11:56:17 +0200 Subject: [PATCH 6/7] last fixes --- .../src/geos/mesh_doctor/actions/mapMFD.py | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index ab56f5b7b..d766499a2 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -15,6 +15,7 @@ if sys.version_info >= ( 3, 11 ): from enum import StrEnum else: + class StrEnum( str, Enum ): """String enumeration base class for Python versions < 3.11.""" pass @@ -131,6 +132,9 @@ def filterVolumeCells( mesh: vtk.vtkDataSet ) -> vtk.vtkDataSet: ext.SetInputData( 0, mesh ) ext.SetInputData( 1, sel ) ext.Update() + + setupLogger.info( + f"Filtered {ext.GetOutput().GetNumberOfCells()}/{mesh.GetNumberOfCells()} volume cells from the input mesh." ) return ext.GetOutput() @@ -157,12 +161,14 @@ class ComputeMFD: def __init__( self, mesh: vtk.vtkDataSet, permeability_field: str = "Permeability" ) -> None: """Initializes the ComputeMFD instance by computing the faces, cell centers, and permeability field from the input mesh.""" - self.faces, self.face2cell = ComputeMFD.compute_newell( mesh ) - self.cell_centers = ComputeMFD.__compute_cell_centroids( mesh ) # make sure that we have always Volume and don't act on surfaces or lines - mesh = filterVolumeCells( mesh ) - mesh = add_cell_volumes( mesh ) + self.mesh = filterVolumeCells( mesh ) + self.mesh = add_cell_volumes( self.mesh ) + # compute faces and cell centers after filtering to ensure we only process volume cells + self.faces, self.face2cell = ComputeMFD.compute_newell( self.mesh ) + self.cell_centers = ComputeMFD.__compute_cell_centroids( self.mesh ) self.permeability_field = permeability_field + self.ip_type = IPType.QTPFA def set_IP( self, ip_type: IPType ) -> None: """Sets the interface pressure type for the MFD computation. @@ -172,7 +178,7 @@ def set_IP( self, ip_type: IPType ) -> None: """ self.ip_type = ip_type - def compute( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float, float, float, float, float ] ]: + def compute( self ) -> List[ Tuple[ float, float, float, float, float, float ] ]: """Computes the MFD indicators for the input mesh based on the specified interface pressure type and returns the results as a list of tuples containing the indicators for each cell. Args: @@ -180,10 +186,11 @@ def compute( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float, float, f """ # if self.ip_type == IPType.TPFA: # return self.compute_tpfa(mesh) + if self.ip_type == IPType.QTPFA: - return self.compute_quasitpfa( mesh ) + return self.compute_quasitpfa( self.mesh ) elif self.ip_type == IPType.BdLVM: - return self.compute_bdlvm( mesh ) + return self.compute_bdlvm( self.mesh ) else: raise ValueError( f"Unsupported IP type: {self.ip_type}" ) @@ -236,7 +243,6 @@ def compute_quasitpfa( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float perm = vtk_to_numpy( mesh.GetCellData().GetArray( self.permeability_field ) ) invperm = 1.0 / perm vol = vtk_to_numpy( mesh.GetCellData().GetArray( "Volume" ) ) - faces, self.face2cell = ComputeMFD.compute_newell( mesh ) cell2face: Dict[ int, List[ int ] ] = {} for k, v in self.face2cell.items(): @@ -246,9 +252,9 @@ def compute_quasitpfa( self, mesh: vtk.vtkDataSet ) -> List[ Tuple[ float, float cell_centers = ComputeMFD.__compute_cell_centroids( mesh ) ncells = len( cell_centers ) M = [ ( 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 ) ] * ncells - face_centers = np.array( [ faces[ k ].center for k in self.face2cell ] ) - face_normals = np.array( [ faces[ k ].normal for k in self.face2cell ] ) - face_area = np.array( [ faces[ k ].area for k in self.face2cell ] ) + face_centers = np.array( [ self.faces[ k ].center for k in self.face2cell ] ) + face_normals = np.array( [ self.faces[ k ].normal for k in self.face2cell ] ) + face_area = np.array( [ self.faces[ k ].area for k in self.face2cell ] ) def process_cell( cell: int ) -> Tuple[ int, float, float, float, float, float, float ]: face_indices = cell2face.get( cell, [] ) @@ -340,7 +346,7 @@ def __get_indicators( K: np.ndarray, def get_eigs( Mx: np.ndarray, Sx: np.ndarray ) -> np.ndarray: lambdas: np.ndarray = np.array( [] ) if compute_eigs: - lambdas, _ = np.linalg.eigvals( Mx + Sx ) + lambdas = np.linalg.eigvals( Mx + Sx ) return lambdas @@ -493,7 +499,7 @@ def meshAction( mesh: vtk.vtkDataSet, options: Options ) -> Result: except Exception as e: raise ValueError( f"Unsupported IP type: {options.ip}" ) from e - res = mfd.compute( mesh ) + res = mfd.compute() mesh = __attach_results( mesh, res, f"{options.ip}_Results" ) writeMesh( mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger ) From 8955927d8963d1377dcad5c817f629cde723b0fb Mon Sep 17 00:00:00 2001 From: jacques franc Date: Wed, 27 May 2026 16:18:24 +0200 Subject: [PATCH 7/7] fix --- .../src/geos/mesh_doctor/actions/mapMFD.py | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py index d766499a2..c9cafdd94 100644 --- a/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py +++ b/mesh-doctor/src/geos/mesh_doctor/actions/mapMFD.py @@ -12,6 +12,8 @@ from geos.mesh.io.vtkIO import writeMesh, VtkOutput, readUnstructuredGrid from geos.mesh_doctor.parsing.cliParsing import setupLogger +MAX_WORKER = 16 + if sys.version_info >= ( 3, 11 ): from enum import StrEnum else: @@ -276,9 +278,15 @@ def process_cell( cell: int ) -> Tuple[ int, float, float, float, float, float, from concurrent.futures import ThreadPoolExecutor, as_completed total = len( cell2face ) - with ThreadPoolExecutor( max_workers=8 ) as executor: + done = 0 + with ThreadPoolExecutor( max_workers=MAX_WORKER ) as executor: futures = [ executor.submit( process_cell, i ) for i in range( total ) ] - results = [ future.result() for future in as_completed( futures ) ] + # results = [ future.result() for future in as_completed( futures ) ] + results = [] + for future in as_completed( futures ): + results.append( future.result() ) + done += 1 + print( f"\rDone: {done}/{total} ({100*done/total:.1f}%)", end="" ) for cell, c, cr, lm, lM, idem, ortho in results: M[ cell ] = ( c, cr, lm, lM, idem, ortho ) @@ -326,9 +334,15 @@ def process_cell( cell: int ) -> Tuple[ int, float, float, float, float, float, from concurrent.futures import ThreadPoolExecutor, as_completed total = len( cell2face ) - with ThreadPoolExecutor( max_workers=8 ) as executor: + done = 0 + with ThreadPoolExecutor( max_workers=MAX_WORKER ) as executor: futures = [ executor.submit( process_cell, i ) for i in range( total ) ] - results = [ future.result() for future in as_completed( futures ) ] + # results = [ future.result() for future in as_completed( futures ) ] + results = [] + for future in as_completed( futures ): + results.append( future.result() ) + done += 1 + print( f"\rDone: {done}/{total} ({100*done/total:.1f}%)", end="" ) for cell, c, cr, lm, lM, idem, ortho in results: M[ cell ] = ( c, cr, lm, lM, idem, ortho ) @@ -500,9 +514,9 @@ def meshAction( mesh: vtk.vtkDataSet, options: Options ) -> Result: raise ValueError( f"Unsupported IP type: {options.ip}" ) from e res = mfd.compute() - mesh = __attach_results( mesh, res, f"{options.ip}_Results" ) + mfd.mesh = __attach_results( mfd.mesh, res, f"{options.ip}_Results" ) - writeMesh( mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger ) + writeMesh( mfd.mesh, options.vtkOutput, canOverwrite=True, logger=setupLogger ) return Result( info=f"MFD {options.ip} computed and written to {options.vtkOutput.output}" )