"""
COMSOL Data Handling.
This module is designed to load 1D solution data from a Gu & Wang reference model from COMSOL as
CSV files.
The idea is that CSV files take a long time to load, so it is more efficient to convert the data to a binary
(npz) format before processing.
COMSOL now saves it's data as a 2D matrix, however it still only uses repeated x values when the boundary solves for
different values on either side. In order to normalize the repeated bounds, all bounds are check to ensure they've got
repeated x values, such that the y values are duplicated.
"""
import logging
import os
from typing import Dict, List, Mapping, MutableMapping, Tuple, Union
import dolfin as fem
import munch
import numpy as np
from scipy import interpolate
from mtnlion import deprecated_domain, deprecated_engine
from mtnlion.domain import Domain
from mtnlion.tools import loader
LOGGER = logging.getLogger(__name__)
[docs]def fix_boundaries(mesh: np.ndarray, data: np.ndarray, boundaries: Union[float, List[int], np.ndarray]) -> np.ndarray:
"""
Adjust COMSOL's interpretation of two-sided boundaries.
When COMSOL outputs data from the reference model there are two solutions at every internal boundary, which causes
COMSOL to have repeated domain values; one for the right and one for the left of the boundary. If there is only one
internal boundary on the variable mesh at a given time, then a duplicate is added.
:param mesh: x data to use to correct the y data
:param data: in 2D, this would be the y data
:param boundaries: internal boundaries
:return: normalized boundaries to be consistent
"""
msg = "Fixing boundaries: {}".format(boundaries)
LOGGER.debug(msg)
b_indices = np.searchsorted(mesh, boundaries)
if not b_indices.any():
return data
for index in b_indices[::-1]:
if mesh[index] != mesh[index + 1]: # add boundary
LOGGER.debug("Adding boundary, copying %f at index %d", mesh[index], index)
data = np.insert(data, index, data[index], axis=0)
return data
[docs]def remove_dup_boundary(data: deprecated_domain.ReferenceCell, item: np.ndarray) -> Union[np.ndarray, None]:
"""
Remove points at boundaries where two values exist at the same coordinate, favor electrodes over separator.
:param data: data in which to reference the mesh and separator indices from
:param item: item to apply change to
:return: Array of points with interior boundaries removed
"""
LOGGER.info("Removing duplicate boundaries")
mask = np.ones(item.shape[-1], dtype=bool)
mask[[data.sep_ind.start, data.sep_ind.stop - 1]] = False
return item[..., mask]
[docs]def get_standardized(cell: deprecated_domain.ReferenceCell) -> Union[deprecated_engine.Mountain, None]:
"""
Convert COMSOL solutions to something more easily fed into FEniCS (remove repeated coordinates at boundaries).
:param cell: reference cell to remove double boundary values from
:return: Simplified solution cell
"""
LOGGER.info("Retrieving FEniCS friendly solution cell")
return cell.filter_space([slice(0, len(cell.mesh))], func=lambda x: remove_dup_boundary(cell, x))
# mesh = remove_dup_boundary(cell, cell.mesh)
# new_data = {k: remove_dup_boundary(cell, v) for k, v in cell.data.items()}
# return domain.ReferenceCell(mesh, cell.time_mesh, cell.boundaries, **new_data)
# TODO generalize the formatting of data for mesh name and arbitrary dimensions
[docs]def load(filename: str) -> deprecated_engine.Mountain:
"""
Load COMSOL reference cell from formatted npz file.
:param filename: name of the npz file
:return: ReferenceCell
"""
file_data = loader.load_numpy_file(filename)
return deprecated_domain.ReferenceCell.from_dict(file_data)
[docs]def adimensionalize_comsol_data(
comsol_data: deprecated_domain.ReferenceCell, mesh: np.ndarray
) -> Mapping[str, Mapping[str, np.ndarray]]:
"""
Separate a one dimensional (in time) set of cell data into three [0-1] domains.
:param comsol_data: data to adimensionalize
:param mesh: destination mesh
"""
def make_dict(data, axis=0):
out = Domain()
if ~np.isnan(data[:, comsol_data.neg_ind]).any():
out.update({"anode": interpolate.interp1d(comsol_data.neg, data[:, comsol_data.neg_ind], axis=axis)(mesh)})
if ~np.isnan(data[:, comsol_data.sep_ind]).any():
out.update(
{"separator": interpolate.interp1d(comsol_data.sep, data[:, comsol_data.sep_ind], axis=axis)(mesh + 1)}
)
if ~np.isnan(data[:, comsol_data.pos_ind]).any():
out.update(
{"cathode": interpolate.interp1d(comsol_data.pos, data[:, comsol_data.pos_ind], axis=axis)(mesh + 2)}
)
return out
data = {k: make_dict(v, 1) for k, v in comsol_data.data.items()}
return data
[docs]def interp_time(
time: np.ndarray, adim_data: Mapping[str, Mapping[str, np.ndarray]]
) -> Dict[str, Dict[str, interpolate.interp1d]]:
"""
Return one dimensional interpolation functions corresponding to a dictionary of dictionaries.
:param time: Times at which the data is sampled
:param adim_data: Data to interpolate
"""
data = {k: {k1: interpolate.interp1d(time, v1, axis=0) for k1, v1 in v.items()} for k, v in adim_data.items()}
return data
[docs]def comsol_preprocessor(
comsol_data: deprecated_domain.ReferenceCell, mesh: np.ndarray
) -> Dict[str, Dict[str, interpolate.interp1d]]:
"""
Pre-process COMSOL data to make it more useful in simulations.
:param comsol_data: Data to preprocess
:param mesh: New solution mesh (adimensionalized)
"""
data = adimensionalize_comsol_data(comsol_data, mesh)
data = interp_time(comsol_data.time_mesh, data)
return data
[docs]def collect_parameters(
params: Mapping[str, Mapping[str, float]]
) -> Tuple[Mapping[str, Mapping[str, float]], Mapping[str, float]]:
"""
Translate deprecated domains into current domain.
:param params: parameters
"""
n_dict: MutableMapping[str, Domain[str, float]] = {}
keys = set()
for key, value in params.items():
if key != "const":
keys.update(list(value.keys()))
for key in keys:
n_dict[key] = Domain()
if params["neg"].get(key, None) is not None:
n_dict[key].update({"anode": params["neg"][key]})
if params["sep"].get(key, None) is not None:
n_dict[key].update({"separator": params["sep"][key]})
if params["pos"].get(key, None) is not None:
n_dict[key].update({"cathode": params["pos"][key]})
return n_dict, params["const"]
[docs]class FenicsFunctions:
"""
Handle the assignment of raw COMSOL data to FEniCS functions.
"""
# pylint: disable=too-few-public-methods
def __init__(self, data: Mapping[str, Domain[str, np.ndarray]], function_space: fem.FunctionSpace):
"""
Initialize FEniCS functions from COMSOL data
:param data: COMSOL data
:param function_space: Function space for the FEniCS functions
"""
self.data = munch.munchify(data)
self.function_space = function_space
self.funcs: MutableMapping[str, Mapping[str, fem.Function]] = {}
for fname, fval in data.items():
self.funcs[fname] = {k: fem.Function(function_space) for k in fval.keys()}
def _set(self, dest, source):
if isinstance(source, np.ndarray):
dest.vector()[:] = source[fem.dof_to_vertex_map(self.function_space)].astype("double")
[docs] def update(self, time):
"""
Update the FEniCS function values for the current time. Interpolates the data as required.
:param time: Time at which to assign the function
"""
for fname, domains in self.funcs.items():
for key, value in domains.items():
self._set(value, self.data[fname][key](time))