"""
Tools for assigning a problem space to a given model.
"""
import json
import warnings
from collections import defaultdict
from typing import Any, Dict, Generator, Iterable, Iterator, List, Mapping, Optional
import dolfin as fem
from mtnlion.cell import DomainInterface
from mtnlion.domain import Domain
from mtnlion.element import ElementSpace
from mtnlion.formula import ArgumentError, Arguments, FormMap, Formula
from mtnlion.model import Model
from mtnlion.variable import UVMap, UVMapSingleDomain, Variable
[docs]class FunctionManager:
"""
Manages the mesh, function space, trial function vector, and test function vector.
"""
# pylint: disable=too-many-instance-attributes
def __init__(self, mesh: fem.mesh, element_space: ElementSpace) -> None:
"""
Create an object to manage a given problem space.
:param mesh: Mesh on which the problem will be evaluated
:param element_space: Elements defining the function space
"""
self.element_space = element_space
self.mesh = mesh
self.mixed_element = fem.MixedElement(self.element_space.elements)
self.function_space = fem.FunctionSpace(self.mesh, self.mixed_element)
self.function_subspace = self._get_subspaces()
self.trial = fem.TrialFunction(self.function_space)
self.u_vec: List[fem.TrialFunction] = []
self.test = fem.TestFunction(self.function_space)
def _get_subspaces(self) -> List[fem.FiniteElement]:
"""
Fetch the subspaces in the mixed element function space
"""
return [self.function_space.sub(i).collapse() for i in range(len(self.element_space.elements))]
[docs] def function(self) -> fem.Function:
"""
Create a FEniCS function defined in the mixed element function space
"""
return fem.Function(self.function_space)
[docs] def assign(self, to_fenics: fem.Function, from_domain: Mapping[Any, Domain]) -> None:
"""
Assign a mixed element function from a Domain based variable.
:param to_fenics: Function to assign
:param from_domain: Domain variable to assign from
"""
for name, domains in from_domain.items():
for domain, funcs in domains.items():
if isinstance(funcs, list):
for index, func in zip(self.element_space.mapping[name][domain], funcs):
fem.assign(to_fenics.sub(index), func)
else:
fem.assign(to_fenics.sub(self.element_space.mapping[name][domain]), funcs)
[docs] def split(self, function: fem.Function) -> Mapping[Any, Domain]:
"""
Split a FEniCS function into a domain structure.
:param function: FEniCS function
"""
funcs = fem.split(function)
return {
name: Domain(
{
domain: [funcs[index] for index in indices] if isinstance(indices, list) else funcs[indices]
for domain, indices in domains.items()
}
)
for name, domains in self.element_space.mapping.items()
}
[docs] def get_subspace(self, variable_name: str, domain: str) -> List[fem.FunctionSpace]:
"""
Retrieve the subspace definition for a given variable, domain, and optionally index if the variable is an
approximation.
:param variable_name: name of the variable
:param domain: domain to retrieve
:return: Function Space
"""
element = self.element_space.mapping[variable_name][domain]
return [self.function_subspace[index] for index in element]
[docs] def initialize_from_constant(
self, variable_name: str, constants: Iterable[fem.Constant], domain: str, time_index: int = 0
) -> None:
"""
Initialize a variable from a constant value.
:param variable_name: Name of the variable
:param constants: List of constants, lenght should be 0 if the function is not an approximation
:param domain: Domain to apply the constant to
:param time_index: Time index to apply the constant to
:return: None
"""
init: Dict[str, Domain[str, fem.Function]] = {
variable_name: Domain(
{
domain: [
fem.interpolate(constant, self.get_subspace(variable_name, domain)[i])
for i, constant in enumerate(constants)
]
}
)
}
self.assign(self.u_vec[time_index], init)
[docs] def initialize_from_expression(
self, variable_name: str, expressions: Iterable[fem.Constant], domain: str, time_index: int = 0
) -> None:
"""
Initialize a variable from an expression.
:param variable_name: Name of the variable
:param expressions: List of constants, lenght should be 0 if the function is not an approximation
:param domain: Domain to apply the constant to
:param time_index: Time index to apply the constant to
:return: None
"""
init: Dict[str, Domain[str, fem.Function]] = {
variable_name: Domain(
{
domain: [
fem.project(expression, self.get_subspace(variable_name, domain)[i])
for i, expression in enumerate(expressions)
]
}
)
}
self.assign(self.u_vec[time_index], init)
[docs]class ElementAssigner:
"""
Provides a set of tools to generate and assign elements to variables in a given model.
"""
def __init__(self, model: Model):
self.model = model
[docs] def assign_elements(
self, element: fem.FiniteElement, domains: Iterable = (), names: Iterable = ()
) -> "ElementAssigner":
"""
Assign a given element to given domains or variable names.
:param element: FEniCS element to assign
:param domains: An iterable of domain names to assign
:param names: An iterable of variable names to assign
:return: `ElementAssigner`
"""
if domains and names:
raise AttributeError("Domain and name cannot be assigned simultaneously.")
for variable in self.model.variables:
self.update_element_map(variable, element, domains)
if names:
for name in names:
variable = self.model.variable_by_name(name)
self.update_element_map(variable, element, variable.domains)
return self
[docs] def check_assigned_elements(self) -> None:
"""
Check to make sure all variables have assigned elements.
:return: None
"""
for variable in self.model.variables:
if not isinstance(variable.uv_map, UVMap):
raise RuntimeError("Variables should not be reduced to a single domain.")
element_check = {
variable.name: {
domain: [function for function in functions if function is None]
for domain, functions in variable.uv_map.element_map.items() # type: ignore
}
for variable in self.model.variables
}
invalid_elements = {name: value for name, value in element_check.items() if any(map(len, value.values()))}
if invalid_elements:
raise RuntimeError(f"Not all elements have been assigned: {invalid_elements}")
[docs] def generate_elements(self) -> ElementSpace:
"""
Create the element space defined by the model and assigned elements.
:return: `ElementSpace`
"""
self.check_assigned_elements()
return ElementSpace(
{
variable.name: variable.uv_map.element_map for variable in self.model.variables # type: ignore
}
)
[docs] @staticmethod
def update_element_map(variable: Variable, element: fem.FiniteElement, domains: Iterable[str]) -> None:
"""
Update the element map for a given variable to use the given element on the specified domains.
:param variable: `Variable` to update
:param element: Element to assign
:param domains: Domains of the variable to assign
:return: None
"""
if isinstance(variable.uv_map, UVMapSingleDomain):
raise RuntimeError("Cannot update the element map using a single domain.")
for domain, elements in variable.uv_map.element_map.items():
if domain in domains:
variable.uv_map.element_map[domain] = [element] * len(elements)
[docs]class ProblemSpace(ElementAssigner):
"""
Defines the relationship between the model, domain, and temporal discretization schemes.
"""
def __init__(self, model: Model, domain, time_discretization):
super(ProblemSpace, self).__init__(model)
self.domain: DomainInterface = domain
self.function_manager: Optional[FunctionManager] = None
self.time_discretization = time_discretization
def _assign_variables_to_model(self) -> None:
"""
Create a solution vector given a temporal discretization and use the given test vector to assign each element
of the vectors to their respective variables.
:return: None
"""
if self.function_manager is None:
raise RuntimeError("Variables have not been generated!")
self.function_manager.u_vec = [
self.function_manager.function()
for _ in range(min(self.model.minimum_rothes_order, self.time_discretization.order) + 1)
]
u_data = [self.function_manager.split(f) for f in self.function_manager.u_vec]
v_data = self.function_manager.split(self.function_manager.test)
for variable in self.model.variables:
trial_function = [u[variable.name] for u in u_data]
test_function = v_data[variable.name]
variable.uv_map = UVMap(trial_function, test_function, variable.uv_map.element_map) # type: ignore
[docs] def generate_variables(self) -> "ProblemSpace":
"""
Given the element mapping defined by the variables in the model, generate the element space for the model and
use that element space to form the test and trial function vectors. The vectors are then split into individual
functions and assigned to variables.
:return: `ProblemSpace`
"""
self.function_manager = FunctionManager(self.domain.domain_data.mesh, self.generate_elements())
self._assign_variables_to_model()
return self
[docs]class ProblemSpaceAssembler:
"""
Handles the assembly of the FFL form from the definition of the model as it relates to the problem space.
"""
def __init__(self, problem_space: ProblemSpace, parameters, lambdas=()):
"""
Initialize a problem space.
"""
self.problem_space = problem_space
self.parameters = parameters
self.forms = {form.name: self._generate_form_map(form) for form in self.problem_space.model.formulas}
self.lambdas = lambdas
intersect = set(self.parameters.keys()).intersection(set(self.forms.keys()))
if intersect:
warnings.warn(f"Overriding parameters {intersect} with formulas.")
def _generate_form_map(self, form: Formula):
return {
domain: [FormMap(form, measure, domain) for measure in self.problem_space.domain.domains[domain]]
for domain in form.domains
}
def _formulate_if_unformulated(self, form_map):
for formula in form_map.formula.parameters:
if formula not in self.forms:
continue
# if form_map.eval_domain not in self.forms[formula]:
# raise KeyError(f"{formula} is not defined in the {form_map.eval_domain} required by {repr(form_map)}")
if any(fmap.formulation is None for fmap in self.forms[formula].get(form_map.eval_domain, ())):
try:
for fmap in self.forms[formula][form_map.eval_domain]:
self._assemble(fmap)
except KeyError:
pass
def _fetch_variables(self, form_map):
form_variables = (
self.problem_space.model.variable_by_name(variable) for variable in form_map.formula.variables
)
for variable in form_variables:
if variable.get_domain(form_map.eval_domain).is_defined or form_map.primary_domain is None:
yield variable.get_domain(form_map.eval_domain)
else:
yield variable.get_domain(form_map.primary_domain)
def _fetch_parameters(self, form_map):
def _fetch_formulas(formula):
for fmap in self.forms[formula].get(form_map.eval_domain, (None,)):
if fmap is None:
return UndefinedFormula(formula, form_map.eval_domain)
return fmap.formulation
form_parameters = (
_fetch_formulas(parameter) if parameter in self.forms else self.parameters[parameter]
for parameter in form_map.formula.parameters
)
for parameter in form_parameters:
if isinstance(parameter, Domain):
yield parameter.get(form_map.eval_domain, None)
else:
yield parameter
def _assemble(self, form_map: FormMap):
"""
Instantiate the formula defined in the FormMap (create the FFL expression).
:param form_map: A mapping between formula and formulation
"""
self._formulate_if_unformulated(form_map)
form = form_map.formula
arguments = Arguments(
self._fetch_variables(form_map),
self._fetch_parameters(form_map),
self.lambdas,
(self.problem_space.time_discretization for _ in form.time_discretizations),
)
try:
form_map.formulation = form.formulate(arguments, form_map.eval_domain)
except ArgumentError as excpt:
raise ArgumentError(f"{excpt} from formula {repr(form)}")
def __str__(self):
def replace(string, mapping):
for old, new in mapping.items():
string = string.replace(old, new)
return string
u_names = {str(func): f"u_{i}" for i, func in enumerate(self.problem_space.function_manager.u_vec)}
v_name = {str(self.problem_space.function_manager.split(self.problem_space.function_manager.test)): "v"}
indices = {
"[{}]".format(index): "{{{}[{}]}}".format(name, domain)
for name, domains in self.problem_space.function_manager.element_space.mapping.items()
for domain, index in domains.items()
if not isinstance(index, list)
}
indices.update(
{
"[{}]".format(index): "{{{}<{}>[{}]}}".format(name, i, domain)
for name, domains in self.problem_space.function_manager.element_space.mapping.items()
for domain, indices in domains.items()
if isinstance(indices, list)
for i, index in enumerate(indices)
}
)
forms = defaultdict(lambda: defaultdict(list))
for name, domains in self.forms.items():
for domain, formulations in domains.items():
for formulation in formulations:
string = replace(str(formulation.formulation), u_names)
string = replace(string, v_name)
string = replace(string, indices)
forms[name][domain].append(string)
return json.dumps(forms, sort_keys=True, indent=4)
@property
def primary_forms(self) -> Dict[str, Dict[str, List[FormMap]]]:
"""
Fetch the formulas that are not dependent on other forms.
:return: Mapping of formulas
"""
return {name: domains for name, domains in self.forms.items() if not any(self.form_dependents(name))}
@property
def secondary_forms(self) -> Dict[str, Dict[str, List[FormMap]]]:
"""
Fetch the formulas that are dependent on other forms.
:return: Mapping of formulas
"""
return {name: domains for name, domains in self.forms.items() if any(self.form_dependents(name))}