Source code for bdsim.blocks.sinks

"""
Sink blocks:

- have inputs but no outputs
- have no state variables
- are a subclass of ``SinkBlock`` |rarr| ``Block``
- that perform graphics are a subclass of  ``GraphicsBlock`` |rarr| ``SinkBlock`` |rarr| ``Block``

"""


import numpy as np
from math import pi, sqrt, sin, cos, atan2

import matplotlib.pyplot as plt
from matplotlib.pyplot import Polygon

from bdsim.components import SinkBlock

# ------------------------------------------------------------------------ #


[docs]class Print(SinkBlock): """ :blockname:`PRINT` Print signal. :inputs: 1 :outputs: 0 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - any - :math:`x` Creates a console print block which displays the value of the input signal at each simulation time step. The display format is like:: PRINT(print.0 @ t=0.100) [-1.0 0.2] and includes the block name, time, and the formatted value. The numerical formatting of the signal is controlled by ``fmt``: - if not provided, ``str()`` is used to format the signal - if provided: - a scalar is formatted by the ``fmt.format()`` - a NumPy array is formatted by ``fmt.format()`` applied to every element Examples:: bd.PRINT(name="X") # block name appears in the printed text bd.PRINT(fmt="{:.1f}") # print with explicit format .. note:: - By default writes to stdout - The output is cleaner if progress bar printing is disabled using the ``-p`` command line option. """ nin = 1 nout = 0
[docs] def __init__(self, fmt=None, file=None, **blockargs): """ :param fmt: Format string, defaults to None :type fmt: str, optional :param file: file to write data to, defaults to None :type file: file object, optional :param blockargs: |BlockOptions| :type blockargs: dict :return: A PRINT block :rtype: Print instance """ super().__init__(**blockargs) self.format = fmt self.file = file
# TODO format can be a string or function def step(self, t, inports): prefix = "{:12s}".format("PRINT({:s} (t={:.3f})".format(self.name, t)) value = inports[0] if self.format is None: # no format string if hasattr(value, "strline"): print(prefix, value.strline(), file=self.file) else: print(prefix, str(value), file=self.file) else: # format string provided if isinstance(value, (int, float)): print(prefix, self.format.format(value), file=self.file) elif isinstance(value, np.ndarray): with np.printoptions( formatter={"all": lambda x: self.format.format(x)} ): print(prefix, value, file=self.file) else: print(prefix, str(value), file=self.file)
# ------------------------------------------------------------------------ #
[docs]class Stop(SinkBlock): """ :blockname:`STOP` Conditionally stop simulation. :inputs: 1 :outputs: 0 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - 0 - any - :math:`x` Conditionally stop the simulation if the input :math:`x` is: - bool type and True - numeric type and > 0 If ``func`` is provided, then it is applied to the block input and if it returns True the simulation is stopped. """ nin = 1 nout = 0
[docs] def __init__(self, func=None, **blockargs): """ :param func: evaluate stop condition, defaults to None :type func: callable, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs) if func is not None and not callable(func): raise TypeError("argument must be a callable") self.stopfunc = func
def start(self, simstate): self._simstate = simstate def step(self, t, inports): value = inports[0] if self.stopfunc is not None: value = self.stopfunc(value) stop = False if isinstance(value, bool): stop = value else: try: stop = value > 0 except: raise RuntimeError("bad input type to stop block") # we signal stop condition by setting simstate.stop to the block calling # the stop if stop: self._simstate.stop = self
# ------------------------------------------------------------------------ #
[docs]class Null(SinkBlock): """ :blockname:`NULL` Discard signal. :inputs: N :outputs: 0 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` Create a sink block with arbitrary number of input ports that discards all data, like ``/dev/null``. Useful for testing. .. note:: ``bdsim`` issues a warning for unconnected outputs but execution can continue. """ nin = -1 nout = 0
[docs] def __init__(self, nin=1, **blockargs): """ :param nin: number of input ports, defaults to 1 :type nin: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(nin=nin, **blockargs)
# ------------------------------------------------------------------------ #
[docs]class Watch(SinkBlock): """ :blockname:`WATCH` Watch a signal. :inputs: N :outputs: 0 :states: 0 .. list-table:: :header-rows: 1 * - Port type - Port number - Types - Description * - Input - i - any - :math:`x_i` Causes the output ports connected to this block's input ports :math:`x_i` to be logged during the simulation run. Equivalent to adding it as the ``watch=`` argument to ``bdsim.run``. For example:: step = bd.STEP(5) ramp = bd.RAMP() watch = bd.WATCH(2) # watch 2 ports watch[0] = step watch[1] = ramp :seealso: :method:`BDSim.run` """ nin = 1 nout = 0
[docs] def __init__(self, **blockargs): """ :param nin: number of input ports, defaults to 1 :type nin: int, optional :param blockargs: |BlockOptions| :type blockargs: dict """ super().__init__(**blockargs)
def start(self, simstate): # called at start of simulation, add this block to the watchlist plug = self.sources[0] # start plug for input wire # append to the watchlist, bdsim.run() will do the rest simstate.watchlist.append(plug) simstate.watchnamelist.append(str(plug))
if __name__ == "__main__": # pragma: no cover from pathlib import Path exec(open(Path(__file__).parent.parent.parent / "tests" / "test_sinks.py").read())