diff --git a/simulai/residuals/_pytorch_residuals.py b/simulai/residuals/_pytorch_residuals.py index b6a38f3..36c1176 100644 --- a/simulai/residuals/_pytorch_residuals.py +++ b/simulai/residuals/_pytorch_residuals.py @@ -29,9 +29,8 @@ class SymbolicOperator(torch.nn.Module): - """The SymbolicOperatorClass is a class that constructs tensor operators using symbolic expressions written in PyTorch. - - + """The SymbolicOperatorClass is a class that constructs tensor operators + using symbolic expressions written in PyTorch. Returns: object: An instance of the SymbolicOperatorClass. """ @@ -52,14 +51,18 @@ def __init__( device: str = "cpu", engine: str = "torch", auxiliary_expressions: list = None, + special_expressions: list = None, ) -> None: if engine == "torch": super(SymbolicOperator, self).__init__() else: pass + # The engine used to build the expressions. + # Usually PyTorch. self.engine = importlib.import_module(engine) + # Basic attributes self.constants = constants if trainable_parameters is not None: @@ -72,9 +75,23 @@ def __init__( self.processing = processing self.periodic_bc_protected_key = "periodic" - self.protected_funcs = ["cos", "sin", "sqrt", "exp", "tanh", "cosh", "sech", "sinh"] - self.protected_operators = ["L", "Div", "Identity", "Kronecker"] - + # Special funcions that must be replaced before the compilation + self.protected_funcs = [ + "cos", + "sin", + "sqrt", + "exp", + "tanh", + "cosh", + "sech", + "sinh", + ] + + # Special operators that must be replaced before the compilation + self.protected_operators = ["L", "Div", "Grad", "Identity", "Kronecker"] + + # Replacing special functions and operatorswith corresponding classes + # and objects self.protected_funcs_subs = self._construct_protected_functions() self.protected_operators_subs = self._construct_implict_operators() @@ -103,6 +120,8 @@ def __init__( else: self.auxiliary_expressions = auxiliary_expressions + self.special_expressions = special_expressions + self.input_vars = [self._parse_variable(var=var) for var in input_vars] self.output_vars = [self._parse_variable(var=var) for var in output_vars] @@ -127,8 +146,11 @@ def __init__( self.output = None - self.f_expressions = list() - self.g_expressions = dict() + self.f_expressions = list() # Main expressions, as PDEs and ODEs + self.g_expressions = dict() # Auxiliary expressions, as boundary conditions + self.h_expressions = ( + list() + ) # Others auxiliary expressions, as those used to evaluate special loss functions self.feed_vars = None @@ -152,10 +174,13 @@ def __init__( else: gradient_function = gradient + # Diff symbol is related to automatic differentiation subs = {self.diff_symbol.name: gradient_function} subs.update(self.external_functions) subs.update(self.protected_funcs_subs) + subs.update(self.protected_operators_subs) + # Compiling expressions to tensor operators for expr in self.expressions: if not callable(expr): f_expr = sympy.lambdify(self.all_vars, expr, subs) @@ -164,6 +189,7 @@ def __init__( self.f_expressions.append(f_expr) + # auxiliary expressions (usually boundary conditions) if self.auxiliary_expressions is not None: for key, expr in self.auxiliary_expressions.items(): if not callable(expr): @@ -173,16 +199,40 @@ def __init__( self.g_expressions[key] = g_expr + # special expressions (usually employed for certain kinds of loss functions) + if special_expressions is not None: + for expr in self.special_expressions: + if not callable(expr): + h_expr = sympy.lambdify(self.all_vars, expr, subs) + else: + h_expr = expr + + self.h_expressions.append(h_expr) + + self.process_special_expression = self._factory_process_expression_serial( + expressions=self.h_expressions + ) + # Method for executing the expressions evaluation if self.processing == "serial": - self.process_expression = self._process_expression_serial + self.process_expression = self._factory_process_expression_serial( + expressions=self.f_expressions + ) else: raise Exception(f"Processing case {self.processing} not supported.") - def _construct_protected_functions(self): - """This function creates a dictionary of protected functions from the engine object attribute. + def _subs_expr(self, expr=None, constants=None): + + if isinstance(expr, list): + for j, e in enumerate(expr): + expr[j] = e.subs(constants) + else: + expr = expr.subs(constants) + return expr + def _construct_protected_functions(self): + """This function creates a dictionary of protected functions from the engine object attribute. Returns: dict: A dictionary of function names and their corresponding function objects. """ @@ -203,8 +253,6 @@ def _construct_protected_functions(self): def _construct_implict_operators(self): """This function creates a dictionary of protected operators from the operators engine module. - - Returns: dict: A dictionary of operator names and their corresponding function objects. """ @@ -284,18 +332,15 @@ def _collect_data_from_inputs_list(self, inputs_list: dict = None) -> list: def _parse_expression(self, expr=Union[sympy.Expr, str]) -> sympy.Expr: """Parses the input expression and returns a SymPy expression. - Args: - expr (Union[sympy.Expr, str], optional, optional): The expression to parse, by default None. It can either be a SymPy expression or a string. - + expr (Union[sympy.Expr, str], optional, optional): The expression to parse, by default None. + It can either be a SymPy expression or a string. Returns: sympy.Expr: The parsed SymPy expression. - Raises: Exception: If the `constants` attribute is not defined, and the input expression is a string. - - """ + if isinstance(expr, str): try: expr_ = sympify( @@ -303,9 +348,12 @@ def _parse_expression(self, expr=Union[sympy.Expr, str]) -> sympy.Expr: ) if self.constants is not None: - expr_ = expr_.subs(self.constants) + expr_ = self._subs_expr(expr=expr_, constants=self.constants) if self.trainable_parameters is not None: - expr_ = expr_.subs(self.trainable_parameters) + expr_ = self._subs_expr( + expr=expr_, constants=self.trainable_parameters + ) + except ValueError: if self.constants is not None: _expr = expr @@ -327,14 +375,13 @@ def _parse_expression(self, expr=Union[sympy.Expr, str]) -> sympy.Expr: def _parse_variable(self, var=Union[sympy.Symbol, str]) -> sympy.Symbol: """Parse the input variable and return a SymPy Symbol. - Args: - var (Union[sympy.Symbol, str], optional, optional): The input variable, either a SymPy Symbol or a string. (Default value = Union[sympy.Symbol, str]) - + var (Union[sympy.Symbol, str], optional, optional): The input variable, either a SymPy Symbol or a string. + (Default value = Union[sympy.Symbol, str]) Returns: sympy.Symbol: A SymPy Symbol representing the input variable. - """ + if isinstance(var, str): return sympy.Symbol(var) else: @@ -342,74 +389,66 @@ def _parse_variable(self, var=Union[sympy.Symbol, str]) -> sympy.Symbol: def _forward_tensor(self, input_data: torch.Tensor = None) -> torch.Tensor: """Forward the input tensor through the function. - Args: input_data (torch.Tensor, optional): The input tensor. (Default value = None) - Returns: torch.Tensor: The output tensor after forward pass. - """ + return self.function.forward(input_data=input_data) def _forward_dict(self, input_data: dict = None) -> torch.Tensor: """Forward the input dictionary through the function. - Args: input_data (dict, optional): The input dictionary. (Default value = None) - Returns: torch.Tensor: The output tensor after forward pass. - """ - return self.function.forward(**input_data) - - def _process_expression_serial(self, feed_vars: dict = None) -> List[torch.Tensor]: - """Process the expression list serially using the given feed variables. - Args: - feed_vars (dict, optional): The feed variables. (Default value = None) + return self.function.forward(**input_data) - Returns: - List[torch.Tensor]: A list of tensors after evaluating the expressions serially. + def _factory_process_expression_serial(self, expressions: list = None): + def _process_expression_serial(feed_vars: dict = None) -> List[torch.Tensor]: + """Process the expression list serially using the given feed variables. + Args: + feed_vars (dict, optional): The feed variables. (Default value = None) + Returns: + List[torch.Tensor]: A list of tensors after evaluating the expressions serially. + """ - """ - return [f(**feed_vars).to(self.device) for f in self.f_expressions] + return [f(**feed_vars).to(self.device) for f in expressions] - def _process_expression_individual( - self, index: int = None, feed_vars: dict = None - ) -> torch.Tensor: - """Evaluates a single expression specified by index from the f_expressions list with given feed variables. + return _process_expression_serial - Args: - index (int, optional): Index of the expression to be evaluated, by default None - feed_vars (dict, optional): Dictionary of feed variables, by default None + def _factory_process_expression_individual(self, expressions: list = None): + def _process_expression_individual( + index: int = None, feed_vars: dict = None + ) -> torch.Tensor: + """Evaluates a single expression specified by index from the f_expressions list with given feed variables. + Args: + index (int, optional): Index of the expression to be evaluated, by default None + feed_vars (dict, optional): Dictionary of feed variables, by default None + Returns: + torch.Tensor: Result of evaluating the specified expression with given feed variables + """ - Returns: - torch.Tensor: Result of evaluating the specified expression with given feed variables + return self.expressions[index](**feed_vars).to(self.device) - """ - return self.f_expressions[index](**feed_vars).to(self.device) + return _process_expression_individual - def __call__( + def _create_input_for_eval( self, inputs_data: Union[np.ndarray, dict] = None ) -> List[torch.Tensor]: """Evaluate the symbolic expression. - This function takes either a numpy array or a dictionary of numpy arrays as input. - Args: inputs_data (Union[np.ndarray, dict], optional): Union (Default value = None) - Returns: List[torch.Tensor]: List[torch.Tensor]: A list of tensors containing the evaluated expressions. - - Raises: - Raises: does: not match with the inputs_key attribute - """ + constructor = MakeTensor( input_names=self.input_names, output_names=self.output_names ) @@ -443,6 +482,23 @@ def __call__( for inputs_list" ) + return outputs, inputs + + def __call__( + self, inputs_data: Union[np.ndarray, dict] = None + ) -> List[torch.Tensor]: + """Evaluate the symbolic expression. + This function takes either a numpy array or a dictionary of numpy arrays as input. + Args: + inputs_data (Union[np.ndarray, dict], optional): Union (Default value = None) + Returns: + List[torch.Tensor]: List[torch.Tensor]: A list of tensors containing the evaluated expressions. + Raises: + does: not match with the inputs_key attribute + """ + + outputs, inputs = self._create_input_for_eval(inputs_data=inputs_data) + feed_vars = {**outputs, **inputs} # It returns a list of tensors containing the expressions @@ -451,11 +507,9 @@ def __call__( def eval_expression(self, key, inputs_list): """This function evaluates an expression stored in the class attribute 'g_expressions' using the inputs in 'inputs_list'. If the expression has a periodic boundary condition, the function evaluates the expression at the lower and upper boundaries and returns the difference. If the inputs are provided as a list, they are split into individual tensors and stored in a dictionary with the keys as the input names. If the inputs are provided as an np.ndarray, they are converted to tensors and split along the second axis. If the inputs are provided as a dict, they are extracted using the 'inputs_key' attribute. The inputs, along with the outputs obtained from running the function, are then passed as arguments to the expression using the 'g(**feed_vars)' syntax. - Args: key (str): the key used to retrieve the expression from the 'g_expressions' attribute inputs_list (list): either a list of arrays, an np.ndarray, or a dict containing the inputs to the function - Returns: the result of evaluating the expression using the inputs.: @@ -558,8 +612,8 @@ def eval_expression(self, key, inputs_list): assert ( self.inputs_key is not None ), "If inputs_list is dict, \ - it is necessary to provide\ - a key." + it is necessary to provide\ + a key." inputs = { key: value @@ -581,11 +635,9 @@ def eval_expression(self, key, inputs_list): @staticmethod def gradient(feature, param): """Calculates the gradient of the given feature with respect to the given parameter. - Args: feature (torch.Tensor): Tensor with the input feature. param (torch.Tensor): Tensor with the parameter to calculate the gradient with respect to. - Returns: torch.Tensor: Tensor with the gradient of the feature with respect to the given parameter. Example: @@ -608,10 +660,8 @@ def gradient(feature, param): def jac(self, inputs): """Calculates the Jacobian of the forward function of the model with respect to its inputs. - Args: inputs (torch.Tensor): Tensor with the input data to the forward function. - Returns: torch.Tensor: Tensor with the Jacobian of the forward function with respect to its inputs. Example: @@ -631,20 +681,20 @@ def sech(self, x): cosh = getattr(self.engine, "cosh") - return 1/cosh(x) + return 1 / cosh(x) def csch(self, x): sinh = getattr(self.engine, "sinh") - return 1/sinh(x) + return 1 / sinh(x) def coth(self, x): cosh = getattr(self.engine, "cosh") sinh = getattr(self.engine, "sinh") - return cosh(x)/sinh(x) + return cosh(x) / sinh(x) def diff(feature: torch.Tensor, param: torch.Tensor) -> torch.Tensor: diff --git a/simulai/tokens.py b/simulai/tokens.py index 3641ced..ca6e1cf 100644 --- a/simulai/tokens.py +++ b/simulai/tokens.py @@ -93,6 +93,38 @@ def Div(u: sympy.Symbol, vars: tuple) -> callable: return l +def Grad(u: sympy.Symbol, vars: tuple) -> callable: + """ + Generate a callable object to compute the gradient operator. + + The gradient operator is a first-order differential operator that measures the + magnitude and direction of a flow of a vector field from its source and + convergence to a point. + + Parameters + ---------- + u : sympy.Symbol + The vector field to compute the divergence of. + vars : tuple + A tuple of variables to compute the divergence with respect to. + + Returns + ------- + callable + A callable object that computes the divergence of a vector field with respect + to the given variables. + + Examples + -------- + >>> x, y, z = sympy.symbols('x y z') + >>> u = sympy.Matrix([x**2, y**2, z**2]) + >>> Grad(u, (x, y, z)) + 2*x + 2*y + 2*z + """ + g = [D(u, var) for var in vars] + + return g + def Gp( g0: Union[torch.tensor, float], r: Union[torch.tensor, float], n: int diff --git a/tests/residuals/test_symbolicoperator.py b/tests/residuals/test_symbolicoperator.py index 4a410d7..48d6d42 100644 --- a/tests/residuals/test_symbolicoperator.py +++ b/tests/residuals/test_symbolicoperator.py @@ -226,6 +226,42 @@ def test_symbolic_operator_diff_operators(self): assert all([isinstance(item, torch.Tensor) for item in residual(data)]) + def test_symbolic_operator_grad_operator(self): + + f = "D(u, x) - D(u, y)" + s_1 = "D(D(u, x) - D(u, y), x)" + s_2 = "D(D(u, x) - D(u, y), y)" + + input_labels = ["x", "y"] + output_labels = ["u"] + + L_x = 1 + L_y = 1 + N_x = 100 + N_y = 100 + dx = L_x / N_x + dy = L_y / N_y + + grid = np.mgrid[0:L_x:dx, 0:L_y:dy] + + data = np.hstack([grid[1].flatten()[:, None], grid[0].flatten()[:, None]]) + + net = model(n_inputs=len(input_labels), n_outputs=len(output_labels)) + + residual = SymbolicOperator( + expressions=[f], + special_expressions=[s_1, s_2], + input_vars=input_labels, + output_vars=output_labels, + function=net, + engine="torch", + ) + u = net(input_data=data) + outputs, inputs = residual._create_input_for_eval(inputs_data=data) + feed_vars = {**outputs, **inputs} + + all(isinstance(item, torch.Tensor) for item in residual.process_special_expression(feed_vars)) + def test_symbolic_operator_1d_pde(self): # Allen-Cahn equation f_0 = "D(u, t) - mu*D(D(u, x), x) + alpha*(u**3) + beta*u"