Source code for bdsim.blocks.functions

"""
Function blocks:

- have inputs and outputs
- have no state variables
- are a subclass of ``FunctionBlock`` |rarr| ``Block``

"""

# The constructor of each class ``MyClass`` with a ``@block`` decorator becomes a method ``MYCLASS()`` of the BlockDiagram instance.

import numpy as np
from scipy import linalg
import scipy.interpolate
import math
import inspect
import spatialmath.base as smb
from typing import Any, Union, Callable

ArrayLike = Union[np.ndarray, int, float, list, tuple]

from bdsim.components import FunctionBlock

# TODO:
# transform 3D points


[docs]class Sum(FunctionBlock): """ :blockname:`SUM` Summing junction. :inputs: N :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - int, float, ndarray - :math:`x_i` * - Output - 0 - int, float, ndarray - :math:`\sum_i \pm x_i` Add or subtract input signals according to the ``signs`` string. The number of input ports is the length of this string. For example:: sum = bd.SUM('+-+') is a 3-input summing junction which computes ``in[0] - in[1] + in[2]``. :note: The signals must be compatible for addition, and if some are arrays they must be broadcastable. """ nin = -1 nout = 1 _modefuncs = { "r": lambda x: x, "c": smb.wrap_mpi_pi, "C": smb.wrap_0_2pi, "L": smb.wrap_0_pi, "l": smb.wrap_0_pi, }
[docs] def __init__(self, signs: str = "++", mode: str = None, **blockargs): """ :param signs: signs associated with input ports, accepted characters: + or -, defaults to "++" :type signs: str, optional :param mode: controls addition mode, per element, string comprises ``r`` or ``c`` or ``C`` or ``L``, defaults to None :type mode: str, optional :param blockargs: |BlockOptions| :type blockargs: dict ``mode`` controls how elements of the input vectors are added/subtracted. Elements which are angles must be treated specially, and this is indicated by the corresponding characters in ``mode``. The string's length must equal the width of the input vectors. The characters of the string can be: ============== ============================================ mode character purpose ============== ============================================ r real number, don't wrap (default) c angle on circle, wrap to [-π, π) C angle on circle, wrap to [0, 2π) L colatitude angle, wrap to [0, π] ============== ============================================ For example if ``mode="rc"`` then a 2-element array would have its second element wrapped to the range [-π, π). """ super().__init__(nin=len(signs), **blockargs) assert isinstance(signs, str), "first argument must be signs string" assert all([x in "+-" for x in signs]), "invalid sign" self.signs = signs self.mode = mode
def output(self, t, inports, x): for i, input in enumerate(inports): # code makes no assumption about types of inputs # NOTE: use sum = sum =/- input rather than sum +/-= input since # these are references if self.signs[i] == "-": if i == 0: sum = -input else: sum = sum - input else: if i == 0: sum = input else: sum = sum + input if self.mode is not None: if isinstance(sum, np.ndarray): # sum is an array if sum.ndim == 1: if len(self.mode) != len(sum): raise ValueError("length of mode string doesn't match") sum = np.array( [self._modefuncs[m](x) for (m, x) in zip(self.mode, sum)] ) elif sum.ndim == 2: if len(self.mode) != sum.shape[0]: raise ValueError( "length of mode string doesn't match number of rows" ) out = [] for col in sum.T: out.append( [self._modefuncs[m](x) for (m, x) in zip(self.mode, col)] ) sum = np.array(out).T else: raise ValueError("expecting 1D or 2D array") else: # sum is a scalar sum = self._modefuncs[self.mode[0]](sum) return [sum]
# ------------------------------------------------------------------------ #
[docs]class Prod(FunctionBlock): r""" :blockname:`PROD` Product junction. :inputs: N :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - int, float, ndarray - :math:`x_i` * - Output - 0 - int, float, ndarray - :math:`\prod_i \{x_i | \frac{1}{x_i}\}` Multiply or divide input signals according to the ``ops`` string. The number of input ports is the length of this string. For example:: prod = PROD('*/*') is a 3-input product junction which computes ``in[0] / in[1] * in[2]``. :note: By default the ``*`` and ``/`` operators are used which perform element-wise operations. :note: The option ``matrix`` will instead use ``@`` and ``@ np.linalg.inv()``. The shapes of matrices must conform. A matrix on a ``/`` input must be square and non-singular. Matrices are multiplied in ascending port order. """ nin = -1 nout = 1
[docs] def __init__(self, ops: str = "**", matrix: bool = False, **blockargs): """ :param ops: operations associated with input ports, accepted characters: * or /, defaults to '**' :type ops: str, optional :param inputs: Optional incoming connections :type inputs: Block or Plug :param matrix: Arguments are matrices, defaults to False :type matrix: bool, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nin=len(ops), **blockargs) assert isinstance(ops, str), "first argument must be signs string" assert all([x in "*/" for x in ops]), "invalid op" self.ops = ops self.matrix = matrix
def output(self, t, inports, x): for i, input in enumerate(inports): if i == 0: if self.ops[i] == "*": prod = input else: if self.matrix: prod = np.linalg.inv(input) prod = 1.0 / input else: if self.ops[i] == "*": if self.matrix: prod = prod @ input else: prod = prod * input else: if self.matrix: prod = prod @ np.linalg.inv(input) else: prod = prod / input return [prod]
# ------------------------------------------------------------------------ #
[docs]class Gain(FunctionBlock): """ :blockname:`GAIN` Gain block. :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - int, float, ndarray - :math:`x` * - Output - 0 - int, float, ndarray - :math:`K x` Either or both the input and gain can be Numpy arrays and Numpy will compute the appropriate product :math:`u K`. If :math:`u` and ``K`` are both NumPy arrays the ``@`` operator is used and :math:`u` is postmultiplied by the gain. To premultiply by the gain, to compute :math:`K u` use the ``premul`` option. For example:: gain = bd.GAIN(2.5) """ nin = 1 nout = 1
[docs] def __init__( self, K: Union[int, float, np.ndarray] = 1, premul: bool = False, **blockargs ): """ :param K: The gain value, defaults to 1 :type K: scalar, array_like :param premul: premultiply by constant, default is postmultiply, defaults to False :type premul: bool, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) self.K = K self.premul = premul self.add_param("K")
def output(self, t, inports, x): input = inports[0] if isinstance(input, np.ndarray) and isinstance(self.K, np.ndarray): # array x array case if self.premul: # premultiply by gain return [self.K @ input] else: # postmultiply by gain return [input @ self.K] else: return [input * self.K]
# ------------------------------------------------------------------------ #
[docs]class Pow(FunctionBlock): """ :blockname:`POW` Power block. :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - int, float, ndarray - :math:`x` * - Output - 0 - int, float, ndarray - :math:`x^p` If the input is a Numpy array the result depends on ``matrix``. If ``matrix`` is False the block performs an elementwise exponentiation and the result is a Numpy array of the same size. If ``matrix`` is True and the input is a square matrix and ``p`` is an integer then matrixwise exponentiation is performedand using repeated matrix multiplication and matrix inversion. If ``p`` is not an integer it uses SciPy to compute the power using an eigenvalue decomposition. For example:: pow = bd.POW(2) :seealso: :func:`numpy.linalg.matrix_power` :func:`scipy.linalg.fractional_matrix_power` """ nin = 1 nout = 1
[docs] def __init__(self, p: Union[int, float] = 1, matrix: bool = False, **blockargs): """ :param p: The exponent value, defaults to 1 :type p: scalar :param matrix: premultiply by constant, default is postmultiply, defaults to False :type matrix: bool, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) self.p = p self.matrix = matrix self.add_param("p")
def output(self, t, inports, x): input = inports[0] if isinstance(input, np.ndarray): # input is an array if self.matrix: # matrixwise exponentiation if isinstance(self.p, int): # integer case return [np.linalg.matrix_power(input, self.p)] else: # general fractional case return [linalg.fractional_matrix_power(input, self.p)] else: # elementwise exponentiation return [input**self.p] else: return [input**self.p]
# ------------------------------------------------------------------------ #
[docs]class Clip(FunctionBlock): r""" :blockname:`CLIP` Signal clipping. :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - int, float, ndarray - :math:`x` * - Output - 0 - int, float, ndarray - :math:`\min(\max(x,a), b)` The input signal is clipped to the range from ``minimum`` to ``maximum`` inclusive. The signal can be a 1D-array in which case each element is clipped. The minimum and maximum values can be: - a scalar, in which case the same value applies to every element of the input vector , or - a 1D-array, of the same shape as the input vector that applies elementwise to the input vector. For example:: clip = bd.CLIP(-1, 1) """ nin = 1 nout = 1
[docs] def __init__( self, min: ArrayLike = -math.inf, max: ArrayLike = math.inf, **blockargs ): """ :param min: Minimum value, defaults to -math.inf :type min: scalar or array_like, optional :param max: Maximum value, defaults to math.inf :type max: float or array_like, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) self.min = min self.max = max
def output(self, t, inports, x): input = inports[0] if isinstance(input, np.ndarray): out = np.clip(input, self.min, self.max) else: out = min(self.max, max(input, self.min)) return [out]
# ------------------------------------------------------------------------ # # TODO can have multiple outputs: pass in a tuple of functions, return a tuple
[docs]class Function(FunctionBlock): r""" :blockname:`FUNCTION` Python function. :inputs: N :outputs: M :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` * - Output - j - any - :math:`f_j(x_0, \ldots, x_{N-1})` Inputs to the block are passed as separate arguments to the function. Programmatic ositional or keyword arguments can also be passed to the function. A block with one output port that sums its two input ports is:: func = bd.FUNCTION(lambda u1, u2: u1+u2, nin=2) A block with a function that takes two inputs and has two additional arguments:: def myfun(u1, u2, param1, param2): pass FUNCTION(myfun, nin=2, args=(p1,p2)) If we need access to persistent (static) data, to keep some state:: def myfun(u1, u2, param1, param2, state): pass func = bd.FUNCTION(myfun, nin=2, args=(p1,p2), persistent=True) where a dictionary is passed in as the last argument and which is kept from call to call. A block with a function that takes two inputs and additional keyword arguments:: def myfun(u1, u2, param1=1, param2=2, param3=3, param4=4): pass func = bd.FUNCTION(myfun, nin=2, kwargs=dict(param2=7, param3=8)) A block with two inputs and two outputs, the outputs are defined by two lambda functions with the same inputs:: FUNCTION( [ lambda x, y: x_t, lambda x, y: x* y]) A block with two inputs and two outputs, the outputs are defined by a single function which returns a list:: def myfun(u1, u2): return [ u1+u2, u1*u2 ] func = bd.FUNCTION( myfun, nin=2, nout=2) """ nin = -1 nout = -1
[docs] def __init__( self, func: Callable = None, nin: int = 1, nout: int = 1, persistent: bool = False, fargs: list = None, fkwargs: dict = None, **blockargs, ): """ :param func: function or lambda, or list thereof, defaults to None :type func: callable or sequence of callables, optional :param nin: number of inputs, defaults to 1 :type nin: int, optional :param nout: number of outputs, defaults to 1 :type nout: int, optional :param persistent: pass in a reference to a dictionary instance to hold persistent state, defaults to False :type persistent: bool, optional :param fargs: extra positional arguments passed to the function, defaults to [] :type fargs: list, optional :param fkwargs: extra keyword arguments passed to the function, defaults to {} :type fkwargs: dict, optional :param blockargs: |BlockOptions| :type blockargs: dict, optional """ if func is None: raise ValueError("function is not defined") super().__init__(nin=nin, nout=nout, **blockargs) if fargs is None: fargs = list() if fkwargs is None: fkwargs = dict() # TODO, don't know why this happens if len(fargs) > 0 and fargs[0] == {}: fargs = [] if isinstance(func, (list, tuple)): for f in func: assert callable(f), "Function must be a callable" if fkwargs is None: # we can check the number of arguments n = len(inspect.signature(func).parameters) if persistent: n -= 1 # discount dict if used if nin + len(fargs) != n: raise ValueError( f"argument count mismatch: function has {n} args," f" dict={dict}, nin={nin}" ) elif callable(func): if len(fkwargs) == 0: # we can check the number of arguments n = len(inspect.signature(func).parameters) if persistent: n -= 1 # discount dict if used if nin + len(fargs) != n: raise ValueError( f"argument count mismatch: function has {n} args, dict={dict}," f" nin={nin}" ) # self.nout = nout self.func = func if persistent: self.userdata = dict() fargs += (self.userdata,) else: self.userdata = None self.args = fargs self.kwargs = fkwargs
def start(self, simstate): super().start(simstate) if self.userdata is not None: self.userdata.clear() print("clearing user data") def output(self, t, inports, x): if callable(self.func): # single function try: val = self.func(*inports, *self.args, **self.kwargs) except TypeError: raise RuntimeError( "Function invocation failed, check number of arguments" ) from None if isinstance(val, (list, tuple)): if len(val) != self.nout: raise RuntimeError( "Function returns wrong number of arguments: " + str(self) ) return val else: if self.nout != 1: raise RuntimeError( "Function returns wrong number of arguments: " + str(self) ) return [val] else: # list of functions out = [] for f in self.func: try: val = f(*inports, *self.args, **self.kwargs) except TypeError: raise RuntimeError( "Function invocation failed, check number of arguments" ) from None out.append(val) return out
# ------------------------------------------------------------------------ #
[docs]class Interpolate(FunctionBlock): """ :blockname:`INTERPOLATE` Interpolate signal. :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - int, float - :math:`x` * - Output - 0 - float - :math:`f(x)` Interpolate a scalar function of a scalar. A simple triangle function with domain [0,10] and range [0,1] can be defined by:: interp = bd.INTERPOLATE(x=(0,5,10), y=(0,1,0)) We might also express this as a list of 2D-coordinates:: interp = bd.INTERPOLATE(xy=[(0,0), (5,1), (10,0)]) .. plot:: import matplotlib.pyplot as plt plt.plot([0, 5, 10], [0, 1, 0], lw=2) plt.grid(True) plt.xlabel("in[0]") plt.ylabel("out[0]") The data can also be expressed as Numpy arrays. If that is the case, the interpolation function can be vector valued. ``x`` has a shape of (N,1) and ``y`` has a shape of (N,M). Alternatively ``xy`` has a shape of (N,M+1) and the first column is the x-data. :note: if ``time=True``. In this case the block has no input ports and is a ``Source`` not a ``Function`` block. :seealso: :func:`scipy.interpolate.interp1d` """ nin = -1 nout = 1
[docs] def __init__( self, x: Union[list, tuple, np.ndarray] = None, y: Union[list, tuple, np.ndarray] = None, xy: np.ndarray = None, time: bool = False, kind: str = "linear", **blockargs, ): """ :param x: x-values of function, defaults to None :type x: array_like, shape (N,) optional :param y: y-values of function, defaults to None :type y: array_like, optional :param xy: combined x- and y-values of function, defaults to None :type xy: array_like, optional :param time: x new is simulation time, defaults to False :type time: bool, optional :param kind: interpolation method, defaults to 'linear' :type kind: str, optional :param blockargs: |BlockOptions| :type blockargs: dict """ self.time = time if time: nin = 0 self.blockclass = "source" else: nin = 1 super().__init__(nin=nin, **blockargs) if xy is None: # process separate x and y vectors x = np.array(x) y = np.array(y) assert x.shape[0] == y.shape[0], "x and y data must be same length" else: # process mixed xy data if isinstance(xy, (list, tuple)): x = [_[0] for _ in xy] y = [_[1] for _ in xy] # x = np.array(x).T # y = np.array(y).T print(x, y) elif isinstance(xy, np.ndarray): x = xy[:, 0] y = xy[:, 1:] self.f = scipy.interpolate.interp1d(x=x, y=y, kind=kind, axis=0) self.x = x
def start(self, simstate, **blockargs): super().start(simstate) if simstate is not None: if self.time: assert self.x[0] >= 0, "interpolation not defined for t<0" if self.x[-1] is np.inf: self.x[-1] = simstate.T assert self.x[-1] >= simstate.T, "interpolation not defined for t>T" def output(self, t, inports, x): if self.time: xnew = t else: xnew = inports[0] return [self.f(xnew)]
if __name__ == "__main__": # pragma: no cover from pathlib import Path exec( open(Path(__file__).parent.parent.parent / "tests" / "test_functions.py").read() )