"""
Tools for access and storage of simulation solutions.
"""
import collections
from typing import Dict, List, Mapping, Optional
import dolfin as fem
import numpy as np
import ufl.algebra # type: ignore
from ufl.core.expr import Expr # type: ignore
import mtnlion.tools.helpers
from mtnlion.domain import Domain
from mtnlion.problem_space import ProblemSpaceAssembler
from mtnlion.tools.helpers import interp_time2, set_domain_data
[docs]class Solution:
"""
This class provides an interface for converting FEniCS functions into numpy-based solutions
"""
# TODO: fix pylint
# pylint: disable=too-many-instance-attributes
# pylint: disable=no-member
def __init__(self, problem_space: ProblemSpaceAssembler, save_list: List[str], dae_space: fem.FunctionSpace):
"""
Initialize the storage object using the problem space, a list of functions to save, and the function space for
any DAEs.
:param problem_space: The problem space
:param save_list: List of functions to save
:param dae_space: Function space for DAEs
"""
self.dae_space = dae_space
self.problem_space = problem_space
self.available_daes: Dict[str, Domain[str, Expr]] = {
name: Domain({domain: data[0].formulation for domain, data in form.items()})
for name, form in self.problem_space.secondary_forms.items()
}
if self.problem_space.problem_space.function_manager is None:
raise RuntimeError("Variables have not been generated!")
self.trial_functions = {
k: domain
for k, domain in self.problem_space.problem_space.function_manager.element_space.mapping.items()
if k in save_list
}
self.daes = {k: domain for k, domain in self.available_daes.items() if k in save_list}
self.solutions: Optional[Mapping[str, Domain[str, np.ndarray]]] = None
self.num_time_steps = 0
self.mesh = np.squeeze(self.problem_space.problem_space.function_manager.mesh.coordinates()) # type: ignore
self.time = np.empty(0)
self.iterations = 0
if self.problem_space.problem_space.function_manager is None:
raise RuntimeError("Solutions not initialized!")
def _init_solution(self, length: int) -> None:
"""
Initialize the solution storage. If called after the solution storage has been initialized, the length will be
appended to the solution.
:param length: Length of the solution (usually time)
"""
self.num_time_steps = length
if self.solutions is None:
self.time = np.empty((length,), dtype=np.float)
save_list = collections.ChainMap(self.trial_functions, self.daes)
coordinates = self.problem_space.problem_space.function_manager.mesh.coordinates() # type: ignore
self.solutions = {
name: set_domain_data(
*mtnlion.tools.helpers.create_solution_matrices(length, len(coordinates), len(domains))
)
for name, domains in save_list.items()
}
else:
self.time = np.append(self.time, np.empty((self.time.shape[0],), dtype=np.float), axis=0)
for name, domains in self.solutions.items():
for domain, value in domains.items():
self.solutions[name][domain] = np.append(
mtnlion.tools.helpers.create_solution_matrices(length - value.shape[0], value.shape[1], 1)[0],
self.solutions[name][domain],
axis=0,
)
[docs] def get_1d(self, function: fem.Function, all_funcs=False) -> Dict[str, Domain[str, np.ndarray]]:
"""
Retrieve the one dimensional values from the given mixed element space function.
:param function:
:param all_funcs:
:return:
"""
# funcs = function.split(True)
if self.problem_space.problem_space.function_manager is None:
raise RuntimeError("Variables have not been generated!")
if all_funcs:
save_list = list(self.problem_space.problem_space.function_manager.element_space.mapping)
else:
save_list = list(self.trial_functions)
mapping = self.problem_space.problem_space.function_manager.element_space.mapping # type: ignore
return {
name: Domain(
{
domain: [mtnlion.tools.helpers.get_1d(function.sub(index, True)) for index in indices]
if isinstance(indices, list)
else mtnlion.tools.helpers.get_1d(function.sub(indices, True))
if domain != "any"
else function.sub(indices, True).vector().get_local()
for domain, indices in domains.items()
}
)
for name, domains in mapping.items()
if name in save_list
}
[docs] def project(self, function: Mapping[str, Domain[str, ufl.algebra.Expr]]):
"""
Project an expression onto the DAE space
:param function: Non mixed-element function
"""
return {
name: mtnlion.tools.helpers.get_1d(fem.project(data, self.dae_space)) for name, data in function.items()
}
[docs] def set_solution_time_steps(self, num_time_steps: int) -> None:
"""
Initialize the solution storage for the specified number of iterations
:param num_time_steps: number of iterations in the simulation
"""
self._init_solution(num_time_steps)
[docs] def save_solution(self, iteration: int, time: float):
"""
Save the state of the solution vector given the iteration and current time.
:param iteration: current simulation iteration
:param time: time at the same iteration
"""
if self.solutions is None:
raise RuntimeError("Solutions not initialized!")
self.time[iteration] = time
self.iterations = max(self.iterations, iteration)
solutions = self.get_1d(self.problem_space.problem_space.function_manager.u_vec[0]) # type: ignore
solutions = {
name: Domain({domain: value[0] for domain, value in domains.items()}) for name, domains in solutions.items()
}
solutions.update({k: self.project(domain) for k, domain in self.daes.items()})
for name, domains in self.solutions.items():
domains[iteration, :] = solutions[name]
[docs] @staticmethod
def interp_time(time: np.ndarray, domain: Mapping[str, Domain[str, np.ndarray]]):
"""
Create an interpolation function for each of the solutions in storage
:param time: The times at which to
:param domain: Data to interpolate
:return: Interpolation functions for the solution
"""
return {name: interp_time2(time, domains) for name, domains in domain.items()}