Source code for bdsim.blocks.connections

"""
Connection blocks are in two categories:

1. Signal manipulation:
    - have inputs and outputs
    - have no state variables
    - are a subclass of ``FunctionBlock`` |rarr| ``Block``
2. Subsystem support
    - have inputs or outputs
    - have no state variables
    - are a subclass of ``SubsysytemBlock`` |rarr| ``Block``

"""

# The constructor of each class ``MyClass`` with a ``@block`` decorator becomes a method ``MYCLASS()`` of the BlockDiagram instance.

import importlib.util
import numpy as np
import copy

import bdsim
from bdsim.components import SubsystemBlock, SourceBlock, SinkBlock, FunctionBlock

# ------------------------------------------------------------------------ #
[docs]class Item(FunctionBlock): """ :blockname:`ITEM` Select item from a dictionary signal. :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - dict - ``D`` * - Output - 0 - any - ``D[i]`` For a dictionary type input signal, select one item as the output signal. For example:: item = bd.ITEM("xd") selects the ``xd`` item from the dictionary signal input to the block. This is somewhat like a demultiplexer :class:`DeMux` but allows for named heterogeneous data. A dictionary signal can serve a similar purpose to a "bus" in Simulink(R). :seealso: :class:`Dict` """ nin = 1 nout = 1
[docs] def __init__(self, item, **blockargs): """ :param item: name of dictionary item :type item: str :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) self.item = item
def output(self, t, inports, x): input = inports[0] # TODO, handle inputs that are vectors themselves assert isinstance(input, dict), "Input signal must be a dict" assert self.item in input, "Item is not in input dict" return [input[self.item]]
[docs]class Dict(FunctionBlock): """ :blockname:`DICT` Create a dictionary signal. :inputs: N :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` * - Output - 0 - dict - ``{key: x[i] for i, key in enumerate(keys)}`` Inputs are assigned to a dictionary signal, using the corresponding names from ``keys``. For example:: dd = bd.DICT(["x", "xd", "xdd"]) expects three inputs and assigns them to dictionary items ``x``, ``xd``, ``xdd`` of the output dictionary respectively. This is somewhat like a multiplexer :class:`Mux` but allows for named heterogeneous data. A dictionary signal can serve a similar purpose to a "bus" in Simulink(R). :seealso: :class:`Item` :class:`Mux` """ nin = 1 nout = 1
[docs] def __init__(self, keys, **blockargs): """ :param keys: list of dictionary keys :type keys: list :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) self.keys = keys
def output(self, t, inports, x): return {key: inports[i] for i, key in enumerate(self.keys)}
# ------------------------------------------------------------------------ #
[docs]class Mux(FunctionBlock): """ :blockname:`MUX` Multiplex signals. :inputs: N :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - float, ndarray - :math:`x_i` * - Output - 0 - ndarray - :math:`[x_0 \ldots x_{N-1}]` This block takes a number of scalar or 1D-array signals and concatenates them into a single 1-D array signal. For example:: mux = bd.MUX(2) :seealso: :class:`Demux` :class:`Dict` """ # TODO could be generalized to creating a list of non numeric data nin = -1 nout = 1
[docs] def __init__(self, nin=1, **blockargs): """ :param nin: Number of input ports, defaults to 1 :type nin: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nin=nin, **blockargs)
def output(self, t, inports, x): # TODO, handle inputs that are vectors themselves out = [] for input in inports: if isinstance(input, (int, float, bool)): out.append(input) elif isinstance(input, np.ndarray): out.extend(input.flatten().tolist()) return [np.array(out)]
# ------------------------------------------------------------------------ #
[docs]class DeMux(FunctionBlock): """ :blockname:`DEMUX` Demultiplex signals. :inputs: 1 :outputs: N :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - iterable - :math:`x` * - Output - i - any - :math:`x_i` This block has a single input port and ``nout`` output ports. The input signal is an iterable whose ``nout`` elements are routed element-wise to individual scalar output ports. If the input is a 1D Numpy array, then each output port is an element of that array. :seealso: :class:`Mux` """ nin = 1 nout = -1
[docs] def __init__(self, nout=1, **blockargs): """ :param nout: number of outputs, defaults to 1 :type nout: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nout=nout, **blockargs)
def output(self, t, inports, x): input = inports[0] # TODO, handle inputs that are vectors themselves assert ( len(input) == self.nout ), "Input width not equal to number of output ports" return list(input)
# ------------------------------------------------------------------------ #
[docs]class Index(FunctionBlock): """ :blockname:`INDEX` :inputs: 1 :outputs: 1 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - iterable - :math:`x` * - Output - j - iterable - :math:`x_i` The specified element(s) of the input iterable (list, string, etc.) are output. The index can be an integer, sequence of integers, a Python slice object, or a string with Python slice notation, eg. ``"::-1"``. :seealso: :class:`Slice1` :class:`Slice2` """ nin = 1 nout = 1
[docs] def __init__(self, index=[], **blockargs): """ Index an iterable signal. :param index: elements of input array, defaults to [] :type index: list, slice or str, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) if isinstance(index, str): args = [None if a == "" else int(a) for a in index.split(":")] self.index = slice(*args) self.index = index
def output(self, t, inports, x): input = inports[0] if len(self.index) == 1: return [input[self.index[0]]] elif isinstance(input, np.ndarray): return [np.array([input[i] for i in self.index])] else: return [[input[i] for i in self.index]]
# ------------------------------------------------------------------------ #
[docs]class SubSystem(SubsystemBlock): """ :blockname:`SUBSYSTEM` Instantiate a subsystem. :inputs: N :outputs: M :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` * - Output - j - any - :math:`y_j` This block represents a subsystem in a block diagram. The definition of the subsystem can be: - the name of a module which is imported and must contain only only ``BlockDiagram`` instance, or - a ``BlockDiagram`` instance The referenced block diagram must contain one or both of: - one ``InPort`` block, which has outputs but no inputs. These outputs are connected to the inputs to the enclosing ``SubSystem`` block. - one ``OutPort`` block, which has inputs but no outputs. These inputs are connected to the outputs to the enclosing ``SubSystem`` block. .. note:: - The referenced block diagram is treated like a macro and copied into the parent block diagram at compile time. The ``SubSystem``, ``InPort`` and ``OutPort`` blocks are eliminated, that is, all hierarchical structure is lost. - The same subsystem can be used multiple times, its blocks and wires will be cloned. Subsystems can also include subsystems. - The number of input and output ports is not specified, they are computed from the number of ports on the ``InPort`` and ``OutPort`` blocks within the subsystem. """ nin = -1 nout = -1
[docs] def __init__(self, subsys, nin=1, nout=1, **blockargs): """ :param subsys: Subsystem as either a filename or a ``BlockDiagram`` instance :type subsys: str or BlockDiagram :param nin: Number of input ports, defaults to 1 :type nin: int, optional :param nout: Number of output ports, defaults to 1 :type nout: int, optional :param blockargs: |BlockOptions| :type blockargs: dict :raises ImportError: DESCRIPTION :raises ValueError: DESCRIPTION """ super().__init__(**blockargs) if isinstance(subsys, str): # attempt to import the file try: module = importlib.import_module(subsys, package=".") except SyntaxError: print("-- syntax error in block definiton: " + subsys) except ModuleNotFoundError: print("-- module not found ", subsys) # get all the bdsim.BlockDiagram instances simvars = [ name for name, ref in module.__dict__.items() if isinstance(ref, bdsim.BlockDiagram) ] if len(simvars) == 0: raise ImportError("no bdsim.Simulation instances in imported module") elif len(simvars) > 1: raise ImportError( "multiple bdsim.Simulation instances in imported module" + str(simvars) ) subsys = module.__dict__[simvars[0]] self.ssvar = simvars[0] elif isinstance(subsys, bdsim.BlockDiagram): # use an in-memory diagram self.ssvar = None else: raise ValueError("argument must be filename or BlockDiagram instance") # check if valid input and output ports ninp = 0 noutp = 0 for b in subsys.blocklist: if b.type == "inport": ninp += 1 elif b.type == "outport": noutp += 1 if ninp > 1: raise ValueError("subsystem cannot have more than one INPORT block") if noutp > 1: raise ValueError("subsystem cannot have more than one OUTPORT block") if ninp + noutp == 0: raise ValueError("subsystem cannot have zero INPORT or OUTPORT blocks") # it's valid, make a deep copy self.subsystem = copy.deepcopy(subsys) # get references to the input and output port blocks self.inport = None self.outport = None for b in self.subsystem.blocklist: if b.type == "inport": self.inport = b elif b.type == "outport": self.outport = b self.ssname = subsys.name self.nin = ninp self.nout = noutp
# ------------------------------------------------------------------------ #
[docs]class InPort(SubsystemBlock): """ :blockname:`INPORT` Input ports for a subsystem. :inputs: 0 :outputs: N :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Output - j - any - :math:`y_j` This block connects a subsystem to a parent block diagram. Inputs to the parent-level ``SubSystem`` block appear as the outputs of this block. .. note:: Only one ``INPORT`` block can appear in a block diagram but it can have multiple ports. This is different to Simulink(R) which would require multiple single-port input blocks. """ nin = 0 nout = -1
[docs] def __init__(self, nout=1, **blockargs): """ :param nout: Number of output ports, defaults to 1 :type nout: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nout=nout, **blockargs)
def output(self, t, inports, x): # signal feed through return inports
# ------------------------------------------------------------------------ #
[docs]class OutPort(SubsystemBlock): """ :blockname:`OUTPORT` Output ports for a subsystem. :inputs: N :outputs: 0 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` This block connects a subsystem to a parent block diagram. The inputs of this block become the outputs of the parent-level ``SubSystem`` block. .. note:: Only one ``OUTPORT`` block can appear in a block diagram but it can have multiple ports. This is different to Simulink(R) which would require multiple single-port output blocks. """ nin = -1 nout = 0
[docs] def __init__(self, nin=1, **blockargs): """ :param nin: Number of input ports, defaults to 1 :type nin: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nin=nin, **blockargs)
def output(self, t, inports, x): # signal feed through return inports
if __name__ == "__main__": # pragma: no cover from pathlib import Path exec( open( Path(__file__).parent.parent.parent / "tests" / "test_connections.py" ).read() )