Source code for bdsim.blockdiagram

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon May 18 21:43:18 2020

@author: corkep
"""
import os
from pathlib import Path
import sys
import importlib
import inspect
import traceback
from collections import Counter, namedtuple
from copy import deepcopy
import numpy as np
from colored import fg, attr
import warnings


from ansitable import ANSITable, Column

from bdsim.components import *

# from stubs import BlockDiagramMixin

# ------------------------------------------------------------------------- #


# class BlockDiagram(BlockDiagramMixin):
[docs]class BlockDiagram: r""" Block diagram class. This object is the parent of all blocks and wires in the system. :ivar wirelist: all wires in the diagram :vartype wirelist: list of Wire instances :ivar blocklist: all blocks in the diagram :vartype blocklist: list of Block subclass instances :ivar x: state vector :vartype x: np.ndarray :ivar compiled: diagram has successfully compiled :vartype compiled: bool :ivar blockcounter: unique counter for each block type :vartype blockcounter: collections.Counter :ivar blockdict: index of all blocks by category :vartype blockdict: dict of lists :ivar name: name of this diagram :vartype name: str This object: * holds all the blocks and wires that comprise the system * manages continuous- and discrete-time state vector for the whole system, splitting it across blocks as required * evaluates the entire diagram as a function to compute :meth:`\dot{x} = f(x, t)` """ def __init__(self, name="main", **kwargs): self.wirelist = [] # list of all wires self.blocklist = [] # list of all blocks self.clocklist = [] # list of all clock sources self.compiled = False # network has been compiled self.blockcounter = Counter() self.name = name self.nstates = 0 self.ndstates = 0 self._issubsystem = False self.blocknames = {} self.options = None self.n_auto_sum = 0 self.n_auto_prod = 0 self.n_auto_const = 0 self.n_auto_gain = 0 self.n_auto_pow = 0 def __getitem__(self, id): print(id) if isinstance(id, str): return self.blocknames[id] else: for b in self.blocklist: if b.id == id: return b raise ValueError(f"block {id} not found") def __len__(self): return len(self.blocklist) def __deepcopy__(self, memo): # deep copy a block diagram # retain references (don't copy) to blocks and the runtime cls = self.__class__ result = cls.__new__(cls) memo[id(self)] = result for k, v in self.__dict__.items(): if type(v).__name__ == "method": # it's a block factory method setattr(result, k, v) elif k == "runtime": # it's a reference to the runtime setattr(result, k, v) else: # otherwise, do a deepcopy setattr(result, k, deepcopy(v, memo)) return result @property def issubsystem(self): return self._issubsystem
[docs] def clock(self, *args, **kwargs): clock = Clock(*args, **kwargs) clock.bd = self self.clocklist.append(clock) return clock
[docs] def add_block(self, block): if block.name in self.blocknames: raise ValueError("block {} already added".format(block.name)) block.id = len(self.blocklist) if block.name is None: i = self.blockcounter[block.type] self.blockcounter[block.type] += 1 block.name = "{:s}.{:d}".format(block.type, i) block.bd = self self.blocklist.append(block) # add to the list of available blocks if block in self.blocknames: raise Warning(f"block name {block} is not unique") self.blocknames[block.name] = block
[docs] def add_wire(self, wire, name=None): wire.id = len(self.wirelist) wire.name = name # just add wire to the list, gets instantiated at compile time # when add_output_wire and add_input_wire are called on the blocks return self.wirelist.append(wire)
def __str__(self): return "BlockDiagram: {:s}".format(self.name) def __repr__(self): return str(self) + " with {:d} blocks and {:d} wires".format( len(self.blocklist), len(self.wirelist) ) # for block in self.blocklist: # s += str(block) + "\n" # s += "\n" # for wire in self.wirelist: # s += str(wire) + "\n" # return s.lstrip("\n")
[docs] def ls(self): for k, v in self.blockdict.items(): print("{:12s}: ".format(k), ", ".join(v))
[docs] def connect(self, start, *ends, name=None): """ TODO: s.connect(out[3], in1[2], in2[3]) # one to many block[1] = SigGen() # use setitem block[1] = SumJunction(block2[3], block3[4]) * Gain(value=2) """ # start.type = 'start' # ensure all blocks are in the blocklist for x in [start, *ends]: if isinstance(x, Block): if x.bd is None: self.add_block(x) elif isinstance(x, Plug): if x.block.bd is None: self.add_block(x.block) for end in ends: if isinstance(start, Block): if isinstance(end, Block): # connect(X, Y) # wires from all outport to all inports assert start.nout == end.nin, ( "can only connect blocks where number of input and output ports" " match" ) for i in range(start.nout): wire = Wire(StartPlug(start, i), EndPlug(end, i), name) self.add_wire(wire) elif isinstance(end, Plug) and not end.isslice: # connect(X, Y[i]) assert ( start.nout == 1 ), "can only connect single output block to a port" end.type = "end" wire = Wire(StartPlug(start, 0), end, name) self.add_wire(wire) elif isinstance(end, Plug) and end.isslice: # connect(X, Y[m:n]) assert start.nout == end.width, ( "can only connect single output block to an input port slice of" " width 1" ) end.type = "end" for i in range(start.nout): wire = Wire(StartPlug(start, i), end[i], name) self.add_wire(wire) else: raise ValueError("bad end type") elif isinstance(start, Plug) and not start.isslice: if isinstance(end, Block): # connect(X[i], Y) # wires from all outport to all inports assert ( end.nin == 1 ), "can only connect a port to a block with single input port" wire = Wire(start, EndPlug(end, 0), name) self.add_wire(wire) elif isinstance(end, Plug) and not end.isslice: # connect(X[i], Y[i]) end.type = "end" wire = Wire(start, end, name) self.add_wire(wire) elif isinstance(end, Plug) and end.isslice: # connect(X[i], Y[m:n]) assert ( end.width == 1 ), "can only connect output port to an input port slice of width 1" end.type = "end" wire = Wire(start, end[0], name) self.add_wire(wire) else: raise ValueError("bad end type") elif isinstance(start, Plug) and start.isslice: if isinstance(end, Block): # connect(X[i:j], Y) assert start.width == end.nin, ( "can only connect output slice to a block with matching number" " of input ports" ) for i in range(end.nin): wire = Wire(start[i], EndPlug(end, i), name) self.add_wire(wire) elif isinstance(end, Plug) and not end.isslice: # connect(X[i:j], Y[m]) assert ( start.width == 1 ), "can only connect output slice of width 1 to a port" wire = Wire(start[0], end, name) self.add_wire(wire) if isinstance(end, Plug) and end.isslice: # connect(X[i:j], Y[m:n]) assert ( start.width == end.width ), "can only connect port slices of same width" for i in range(start.width): wire = Wire(start[i], end[i], name) self.add_wire(wire) else: raise ValueError("bad end type") else: raise ValueError("bad start type")
# ---------------------------------------------------------------------- #
[docs] def compile( self, subsystem=False, doimport=True, evaluate=True, report=False, verbose=True ): """ Compile the block diagram :param subsystem: importing a subsystems, defaults to False :type subsystem: bool, optional :param doimport: import subsystems, defaults to True :type doimport: bool, optional :raises RuntimeError: various block diagram errors :return: Compile status :rtype: bool Performs a number of operations: - Check sanity of block parameters - Recursively clone and import subsystems - Check for loops without dynamics - Check for inputs driven by more than one wire - Check for unconnected inputs and outputs - Link all output ports to outgoing wires - Link all input ports to incoming wires - Evaluate all blocks in the network """ # name the elements self.nblocks = len(self.blocklist) self.nwires = len(self.wirelist) error = False self.nstates = 0 self.ndstates = 0 self.statenames = [] self.dstatenames = [] self.blocknames = {} if not subsystem and verbose: print("\nCompiling:") # process all subsystem imports # ssblocks = [b for b in self.blocklist if b.type == 'subsystem'] # for b in ssblocks: # print(' importing subsystem', b.name) # if b.ssvar is not None: # print('-- Wiring in subsystem', b, 'from module local variable ', b.ssvar) self.blocklist, self.wirelist = self._subsystem_import( self, None, verbose=verbose ) # check that wires all point to valid blocks for w in self.wirelist: if w.start.block not in self.blocklist: raise RuntimeError( f"wire {w} starts at unreferenced block {w.start.block}" ) if w.end.block not in self.blocklist: raise RuntimeError(f"wire {w} ends at unreferenced block {w.end.block}") # run block specific checks for b in self.blocklist: try: b.check() except: raise RuntimeError("block failed check " + str(b)) # build a dictionary of all block names self.blocknames = {b.name: b for b in self.blocklist} # visit all stateful blocks for b in self.blocklist: if b.blockclass == "transfer": self.nstates += b.nstates if b._state_names is not None: assert ( len(b._state_names) == b.nstates ), "number of state names not consistent with number of states" self.statenames.extend(b._state_names) else: # create default state names self.statenames.extend( [b.name + "x" + str(i) for i in range(0, b.nstates)] ) if b.blockclass == "clocked": self.ndstates += b.ndstates if b._state_names is not None: assert ( len(b._state_names) == b.nstates ), "number of state names not consistent with number of states" self.dstatenames.extend(b._state_names) else: # create default state names self.statenames.extend( [b.name + "X" + str(i) for i in range(0, b.nstates)] ) # initialize lists of input and output ports for b in self.blocklist: b.output_wires = [[] for i in range(0, b.nout)] b.input_wires = [None for i in range(0, b.nin)] b.sources = [None for i in range(0, b.nin)] # used to build execution plan # TODO: might overlap with sources b._parents = [None for i in range(0, b.nin)] # connect the source and destination blocks to each wire for w in self.wirelist: try: w.start.block.add_output_wire(w) w.end.block.add_input_wire(w) w.end.block._parents[w.end.port] = w.start.block except: print(fg("red")) print("error connecting wire ", w.fullname + ": ", sys.exc_info()[1]) print(attr(0)) error = True # check connections every block for b in self.blocklist: # check all inputs are connected for port, connection in enumerate(b.input_wires): if connection is None: print( " ERROR: [{:s}] input {:d} is not connected".format( str(b), port ) ) error = True # check all outputs are connected for port, connections in enumerate(b.output_wires): if len(connections) == 0: print( " INFORMATION: [{:s}] output {:d} is not connected".format( str(b), port ) ) if b._inport_names is not None: assert ( len(b._inport_names) == b.nin ), "incorrect number of input names given: " + str(b) if b._outport_names is not None: assert ( len(b._outport_names) == b.nout ), "incorrect number of output names given: " + str(b) if b._state_names is not None: assert ( len(b._state_names) == b.nstates ), "incorrect number of state names given: " + str(b) # check for cycles of function blocks def _DFS(path): start = path[0] tail = path[-1] for outgoing in tail.output_wires: # for every port on this block for w in outgoing: dest = w.end.block if dest == start: print( " ERROR: cycle found: ", " - ".join([str(x) for x in path + [dest]]), ) return True if dest.blockclass == "function": return _DFS(path + [dest]) # recurse return False for b in self.blocklist: if b.blockclass == "function": # do depth first search looking for a cycle if _DFS([b]): error = True if error: if not subsystem: raise RuntimeError("could not compile system") # create the execution plan/schedule self.schedule_generate() ## evaluate the network once to check out wire types x = self.getstate0() for clock in self.clocklist: clock._x = clock.getstate0() if report: self.report() self.schedule_report() if not subsystem and evaluate: # run all the blocks for one step try: self.schedule_evaluate(x, 0.0, sinks=False) except RuntimeError as err: print("\nFrom compile: unrecoverable error in value propagation:", err) traceback.print_exc(file=sys.stderr) error = True if error: # show report if there was an error if not report: self.report() if not subsystem: raise RuntimeError("could not compile system") else: self.compiled = True return self.compiled
def _subsystem_import(self, bd, sspath, verbose=False): blocks = [] wires = bd.wirelist for b in bd.blocklist: # rename the block to include subsystem path if sspath is not None: b.name = sspath + "/" + b.name if b.type == "subsystem": # deal with a subsystem # - recurse to import it # - add its blocks and wires to the set if verbose: print("instantiating subsystem ", b.name) ssb, ssw = self._subsystem_import(b.subsystem, b.name) blocks.extend(ssb) wires.extend(ssw) # INPORT/OUTPORT blocks now become simple pass throughs # same number of inputs and outputs b.inport.nin = b.inport.nout b.outport.nout = b.outport.nin # modify the wiring, keep the INPORT/OUTPORT blocks but lose # the SUBSYSTEM blocks for w in bd.wirelist: # for all wires at this level, find those that connect # to the subsystem and tweak them if w.start.block == b: # SS output w.start.block = b.outport if w.end.block == b: # SS input w.end.block = b.inport else: # not a subsystem, just add the block to the list blocks.append(b) # systematically renumber all blocks and wires for i, b in enumerate(blocks): b.id = i for i, w in enumerate(wires): w.id = i return blocks, wires # ---------------------------------------------------------------------- #
[docs] def schedule_evaluate(self, x, t, checkfinite=True, sinks=True, simstate=None): """ Evaluate all blocks in the network :param x: state :type x: ndarray :param t: current time :type t: float :param checkfinite: check for Inf or Nan values in block outputs :type checkfinite: bool :param sinks: evaluate sink blocks, defaults to Trye :type sinks: bool, optional :param simstate: simulation state :return: state derivative :rtype: numpy.ndarray Performs the following steps: 1. Partition the state vector ``x`` to all stateful blocks 2. Execute the blocks in the order given by the ``plan``. The block outputs are "sent" to their connected inputs. Sink blocks are not executed here, but after completion their inputs will all be valid. """ # TODO: don't copy outputs to inputs of next block, have inputs # pull the value from connected inputs self.runtime.DEBUG("state", ">>>>>>>>> t={}, x={} >>>>>>>>>>>>>>>>", t, x) # reset all the blocks ready for the evalation self.reset() # split the state vector to stateful blocks for b in self.blocklist: if b.blockclass == "transfer": x = b.setstate(x) # split the discrete state vector to clocked blocks for clock in self.clocklist: clock.setstate() self.runtime.DEBUG("propagate", "t={:.3f}", t) for sequence, group in enumerate(self.plan): # self.runtime.DEBUG('propagate', '---- sequence = ', sequence) for b in group: # ask the block for output, check for errors try: if sequence == 0: # blocks called at step 0 have no inputs out = b.output(t, None, b._x) else: out = b.output(t, b.inputs, b._x) except Exception as err: # output method failed, report it print(fg("red")) print( "--Error at t={:f} when computing output of [{:s}::{:s}]".format( t, b.type, str(b) ) ) print() # print(' {}'.format(err)) traceback.print_exc(file=sys.stderr) print() for i, input in enumerate(b.inputs): print(f"Input[{i}] = {input}") if b.nstates > 0: print(f"Block state x = {b._x}") print(attr(0)) raise RuntimeError from None self.runtime.DEBUG("propagate", "block {:s}: output = {}", b, out) # check that output is a list of correct length if not isinstance(out, (tuple, list)): raise AssertionError( f"block {b} output {b} must be a list: {type(out)}" ) if len(out) != b.nout: raise AssertionError( f"block {b} output {b} has incorrect length: {len(out)} instead" f" of {b.nout}" ) # TODO check output validity once at the startq # check it has no nan or inf values if ( checkfinite and isinstance(out, (int, float, np.ndarray)) and not np.isfinite(out).any() ): raise RuntimeError(f"block {b} output contains NaN") # # send block outputs to all downstream connected blocks # for (port, outwires) in enumerate(b.outports): # every port # value = out[port] # for w in outwires: # every wire # self.DEBUG('propagate', ' [{}] = {} --> {}[{}]', port, value, w.end.block.name, w.end.port) # # send value to wire # w.send(value) # # TODO send return status no longer needed # # TODO use common error handler in all cases above b.output_values = out if sinks: for b in self.blocklist: if isinstance(b, SinkBlock): b.step(t, b.inputs) # gather the derivative YD = self.deriv(t) self.runtime.DEBUG("deriv", YD) return YD
[docs] def schedule_generate(self): """ Create execution plan The plan is saved in the attribute ``plan`` and is a list ``[L0, L1, ... LN]`` where each ``Li`` is a list of blocks. The blocks in the lists are executed sequentially, ie. all the blocks in ``L0`` then all the blocks in ``L1`` etc. The plan ensures that the inputs of all blocks in ``Li`` have been previously computed. .. note:: - The plan is essentially a dataflow graph. - The blocks in list ``Li`` could potentially be executed in parallel. - Constant blocks and stateful blocks are all executed in ``L0`` - The block attribute ``_sequence`` is ``i`` and indicates its execution order :seealso: :func:`schedule_report`, :func:`schedule_dotfile` """ plan = [] group = [] for b in self.blocklist: b._sequence = None if b.blockclass in ("source", "transfer", "clocked"): b._sequence = 0 group.append(b) plan.append(group) sequence = len(plan) while True: group = [] for b in self.blocklist: if b._sequence is not None: continue # already has a sequence assigned if all( [ p._sequence < sequence if p._sequence is not None else False for p in b._parents ] ): group.append(b) for b in group.copy(): b._sequence = sequence if b.blockclass in ("sink", "graphics"): group.remove(b) if len(group) == 0: break plan.append(group) sequence += 1 self.plan = plan
[docs] def schedule_dotfile(self, filename): """ Write a GraphViz dot file representing the execution schedule :param file: Name of file to write to :type file: str The file can be processed using neato or dot:: % dot -Tpng -o out.png dotfile.dot Display execution plan as a dataflow graph. :seealso: :func:`schedule_plan`, :func:`schedule_print` """ if isinstance(filename, str): file = open(filename, "w") else: file = filename header = r"""digraph G { graph [splines=ortho, rankdir=LR, splines=spline] node [shape=box] """ file.write(header) for sequence, group in enumerate(self.plan): # for each execution group, place the blocks in a subgraph file.write("\tsubgraph step{:d} {{\n".format(sequence)) file.write("\t\trank=same;\n") for b in group: file.write('\t\t"{:s}"\n'.format(b.name)) file.write("\t}\n\n") # connect them to their parents, except if a transfer block for b in self.blocklist: if not b.blockclass == "transfer": for p in b._parents: file.write('\t"{:s}" -> "{:s}"\n'.format(p.name, b.name)) file.write("}\n")
# ---------------------------------------------------------------------- # def _debugger(self, simstate=None, integrator=None): if simstate.t_stop is not None and simstate.t < simstate.t_stop: return def print_output(b, t, inports, x): out = b.output(t, inports, x) if len(out) == 1: print(f"{b.name} = {out[0]}") else: print(f"{b.name}:") for i, o in enumerate(out): print(f" [{i}] = {o}") np.set_printoptions(precision=6, linewidth=120) simstate.t_stop = None if not hasattr(self, "debug_watch"): self.debug_watch = None print("\n") if self.debug_watch is not None: t = simstate.t for b in self.debug_watch: print_output(b, t, b.inputs, b._x) while True: try: t = simstate.t cmd = input(f"(bdsim, t={t:.6f}) ") if len(cmd) == 0: continue if cmd[0] == "p": # print variables if len(cmd) > 1: id = int(cmd[1:]) b = self.blocklist[id] print_output(b, t, b.inputs, b._x) else: for b in self.blocklist: if b.nout > 0: print_output(b, t, b.inputs, b._x) elif cmd[0] == "i": print( f"status={integrator.status}, dt={integrator.step_size:.4g}, nfev={integrator.nfev}" ) elif cmd[0] == "s": # step break elif cmd[0] == "c": # continue self.debug_stop = False self.t_stop = None break elif cmd[0] == "t": self.t_stop = float(cmd[1:]) break elif cmd[0] == "q": sys.exit(1) elif cmd[0] == "r": self.report() elif cmd[0] == "w": if len(cmd) == 1: # clear the watch list print(self.debug_watch) self.debug_watch = None else: self.debug_watch = [ self.blocklist[int(s.strip())] for s in cmd[2:].split(" ") ] elif cmd == "pdb": import pdb pdb.runeval('print("type exit to leave Pdb")') elif cmd[0] in "h?": print("p print all outputs") print("pI print block id I output") print("i print integrator status") print("s single step") print("c continue") print("tT stop at or after time T") print("r print block and wires") print("pdb enter PDB debugger") print("w id watch list, display at every step") print("q quit") except (IndexError, ValueError, TypeError): print("??") pass # ---------------------------------------------------------------------- #
[docs] def report_summary(self, sortby="name", **kwargs): """ Print a summary of block diagram. :param sortby: sort rows by specified block attribute: "name" [default] or "type" :type sortby: str, optional :param style: table style, one of: ansi (default), markdown, latex :type style: str Print a table with 4 columns: 1. Block name, sorted in alphabetical order 2. The input port (if not a source block) 3. The block driving this port (if not a source block) 4. The type of value driving this port (if not a source block) If the block is an event source, add a ``@`` suffix. """ table = ANSITable( Column("block", headalign="^", colalign="<"), Column("type", headalign="^", colalign="<"), Column("inport", headalign="^", colalign="<"), Column("source", headalign="^", colalign="<"), Column("source type", headalign="^", colalign="<"), border="thin", ) if sortby == "name": sortfunc = lambda x: x.name elif sortby == "type": sortfunc = lambda x: x.type first = True legend = None for b in sorted(self.blocklist, key=sortfunc): name = str(b) if isinstance(b, EventSource): name += "@" legend = "Note: @ = event source" # add a divider before each subsequent row if not first: table.rule() else: first = False # print the details if len(b.sources) > 0: # non source block, list all its inputs, one per row inputs = b.inputs for port, source in enumerate(b.sources): # every port value = inputs[port] typ = type(value).__name__ if isinstance(value, np.ndarray): typ += "{:s}.{:s}".format(str(value.shape), str(value.dtype)) src_name = source.block.name if source.block.nout > 1: src_name += f"[{source.port}]" if port == 0: # first row for this block table.row(name, b.type, port, src_name, typ) else: # subsequent rows table.row("", "", port, src_name, typ) else: # source block, just list the name table.row(name, b.type, "", "", "") table.print(**kwargs) if legend: print(legend + "\n")
[docs] def report(self, **kwargs): warnings.warn("use reports_lists() method instead", DeprecationWarning) self.report_lists(**kwargs)
[docs] def report_lists(self, **kwargs): """ Print a tabular report about the block diagram. :param kwargs: options passed to :meth:`ansitable.ANSITable.print` Print the important lists in pretty format. * block list, all blocks * wire list, all wires * clock list, all discrete time clocks """ # print all the blocks print("\nBlocks::\n") table = ANSITable( Column("id"), Column("name"), Column("nin"), Column("nout"), Column("nstate"), Column("ndstate"), Column("type", headalign="^", colalign="<"), border="thin", ) for b in self.blocklist: table.row(b.id, str(b), b.nin, b.nout, b.nstates, b.ndstates, b.type) table.print(**kwargs) # print all the wires print("\nWires::\n") table = ANSITable( Column("id"), Column("from", headalign="^"), Column("to", headalign="^"), Column("description", headalign="^", colalign="<"), Column("type", headalign="^", colalign="<"), border="thin", ) for w in self.wirelist: start = "{:d}[{:d}]".format(w.start.block.id, w.start.port) end = "{:d}[{:d}]".format(w.end.block.id, w.end.port) try: value = w.end.block.inputs[w.end.port] typ = type(value).__name__ if isinstance(value, np.ndarray): typ += "{:s}.{:s}".format(str(value.shape), str(value.dtype)) except: typ = "??" table.row(w.id, start, end, w.fullname, typ) table.print(**kwargs) if len(self.clocklist) > 0: # print all the clocked blocks print("\nClocked blocks::\n") table = ANSITable( Column("id"), Column("block"), Column("clock"), Column("period"), Column("offset"), border="thin", ) for b in self.blocklist: if b.blockclass == "clocked": c = b.clock table.row(b.id, str(b), c.name, c.T, c.offset) table.print(**kwargs) if not self.compiled: print("** System has not been compiled, or had a compile time error")
[docs] def report_schedule(self, **kwargs): """ Display execution schedule in tabular form :param kwargs: options passed to :meth:`ansitable.ANSITable.print` :seealso: :func:`schedule_plan`, :func:`schedule_dotfile` """ table = ANSITable( Column("Step"), Column("Blocks", colalign="<", headalign="^"), border="thin", ) for sequence, group in enumerate(self.plan): table.row(sequence, ", ".join([str(b) for b in group])) table.print(**kwargs)
# ---------------------------------------------------------------------- # def _error_handler(self, where, block): # called from except clause import traceback import types t, v, tb = sys.exc_info() # get the exception print(fg("red")) # red text # print the traceback print(f"[{block.type} block: {block.name}.{where}]: exception {t.__name__}") print(f" {v}\n") traceback.print_tb(tb) # print all block inputs print() for i in range(block.nin): input = block.inputs[i] print( f"input {i} from" f" {block.sources[i].block.name} [{input.__class__.__name__}]" ) print(" ", input) print(attr(0)) # default text # traceback = err[2] # back_frame = traceback.tb_frame.f_back # back_tb = types.TracebackType(tb_next=None, # tb_frame=back_frame, # tb_lasti=back_frame.f_lasti, # tb_lineno=back_frame.f_lineno) # raise RuntimeError('Fatal failure').with_traceback(back_tb) raise RuntimeError("Fatal failure") from None
[docs] def getstate0(self): # get the state from each stateful block x0 = np.array([]) for b in self.blocklist: try: if b.blockclass == "transfer": x0 = np.r_[x0, b.getstate0()] # print('x0', x0) except: self._error_handler("getstate0", b) return x0
[docs] def reset(self): """ Reset conditions within every active block. Most importantly, all inputs are marked as unknown. Invokes the `reset` method on all blocks. """ for b in self.blocklist: try: b.reset() except: self._error_handler("reset", b)
[docs] def step(self, t): """ Step all blocks :param t: simulation time, defaults to None :type t: float :param inports: block input port values :type inports: list Tell all blocks to take action on new inputs by invoking their ``step`` method and passing the ``state`` object. Used to save results to a figure or file. Called at the end of every integration interval. .. note:: - if ``graphics`` is False, Graphics blocks are not called """ # TODO could be done by output method, even if no outputs for b in self.blocklist: try: if isinstance(b, SinkBlock): b.step(t, b.inputs) except: self._error_handler("step", b)
[docs] def deriv(self, t): """ Harvest derivatives from all blocks. :param t: simulation time, defaults to None :type t: float :param simstate: simulation state, defaults to None :type simstate: SimState, optional """ YD = np.array([]) for b in self.blocklist: if b.blockclass == "transfer": try: yd = b.deriv(t, b.inputs, b._x) if not isinstance(yd, np.ndarray): raise AssertionError(f"deriv: block {b} did not return ndarray") if yd.ndim != 1 or yd.shape[0] != b.nstates: raise AssertionError( f"deriv: block {b} returns wrong shape {yd.shape}, should" f" be ({b.nstates},)" ) YD = np.r_[YD, yd] except: self._error_handler("deriv", b) return YD
[docs] def start(self, simstate=None): """ Start all blocks :param simstate: simulation state, defaults to None :type simstate: SimState, optional Inform all blocks that BlockDiagram execution is about to start by invoking their ``start`` method and passing the ``state`` object. Used to open files, create figures etc. .. note:: if ``graphics`` is False, Graphics blocks are not called """ for c in self.clocklist: try: c.start(simstate) except: self._error_handler("start_clocked", c) # safe wrapper for block starting, does error handling for b in self.blocklist: # print('starting block', b) try: b.start(simstate) except: self._error_handler("start", b)
[docs] def initialstate(self): for b in self.blocklist: if b.blockclass in ("transfer", "clocked"): b._x = b._x0
[docs] def done(self, block=False): """ Finishup all blocks :param state: simulation state, defaults to None :type state: SimState, optional :param graphics: graphics enabled, defaults to False :type graphics: bool, optional Inform all blocks that BlockDiagram execution is complete by invoking their ``done`` method and passing options. Used to close files, display figures etc. .. note:: if ``graphics`` is False, Graphics blocks are not called """ for b in self.blocklist: try: b.done(block=block) except: self._error_handler("done", b)
[docs] def dotfile(self, filename, shapes=None): """ Write a GraphViz dot file representing the network. :param file: Name of file to write to, or file handle :type file: str, file handle :param shapes: block shapes :type shapes: dict Create a GraphViz format file for procesing by ``dot``. The graph is: * directed graph, drawn left to right * source blocks are in the first column * sink and graphics blocks are in the last column * ``SUM`` and ``PROD`` blocks have the sign or operation of their input wires labeled. The file can be processed using ``dot``:: % dot -Tpng -o out.png dotfile.dot .. image:: ../../figs/eg1.png :width: 600 :alt: Block diagram represented as a mathematical graph .. note:: By default all blocks have the default shape, with source blocks shown as a rectangle ("record"), and sink/graphics blocks as a rounded rectangle ("Mrecord"). This can be overriden by provide a dictionary ``shapes`` that maps block class (sink, source, graphics, function, transfer) to the names of GraphViz shapes. :seealso: :meth:`showgraph` """ if shapes is None: shapes = dict(source="record", sink="Mrecord", graphics="Mrecord") if isinstance(filename, str): file = open(filename, "w") else: file = filename header = r"""digraph G { rankdir = "LR" """ file.write(header) # add the blocks for b in self.blocklist: options = [] if b.blockclass in shapes: options.append("shape={:s}".format(shapes[b.blockclass])) if b.blockclass == "source": options.append('rank="source"') if b.blockclass in ("sink", "graphics"): options.append('rank="sink"') if b.pos is not None: options.append('pos="{:g},{:g}!"'.format(b.pos[0], b.pos[1])) # options.append( # 'xlabel=<<BR/><FONT POINT-SIZE="8" COLOR="blue">{:s}</FONT>>'.format( # b.type # ) # ) if len(options) > 0: file.write('\t"{:s}" [{:s}]\n'.format(b.name, ", ".join(options))) file.write("\n") # add the wires for w in self.wirelist: options = [] # options.append('xlabel="{:s}"'.format(w.name)) if w.end.block.type == "sum": options.append( 'headlabel="{:s} "'.format(w.end.block.signs[w.end.port]) ) options.append("labeldistance=1.5") if w.end.block.type == "prod": options.append('headlabel="{:s} "'.format(w.end.block.ops[w.end.port])) options.append("labeldistance=1.5") file.write( '\t"{:s}" -> "{:s}" [{:s}]\n'.format( w.start.block.name, w.end.block.name, ", ".join(options) ) ) file.write("}\n")
[docs] def showgraph(self): """ Display diagram as a graph in browser tab :seealso: :meth:`dotfile` """ # Lazy import try: import tempfile import subprocess import webbrowser except ModuleNotFoundError: return # create the temporary dotfile dotfile = tempfile.TemporaryFile(mode="w") self.dotfile(dotfile) # rewind the dot file, create PDF file in the filesystem, run dot dotfile.seek(0) pdffile = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) subprocess.run("dot -Tpdf", shell=True, stdin=dotfile, stdout=pdffile) # open the PDF file in browser (hopefully portable), then cleanup webbrowser.open(f"file://{pdffile.name}")
[docs] def blockvalues(self, t=None, simstate=None): for b in self.blocklist: print("Block {:s}:".format(b.name)) print(" inputs: ", b.inputs) print(" outputs: ", b.output(t, b.inputs, b._x))
if __name__ == "__main__": # pragma: no cover import bdsim bd = bdsim.BlockDiagram() # define the blocks demand = bd.STEP(T=1, pos=(0, 0), name="demand") sum = bd.SUM("+-", pos=(1, 0)) gain = bd.GAIN(10, pos=(1.5, 0)) plant = bd.LTI_SISO(0.5, [2, 1], name="plant", pos=(3, 0)) # scope = bd.SCOPE(pos=(4,0), styles=[{'color': 'blue'}, {'color': 'red', 'linestyle': '--'}) scope = bd.SCOPE(nin=2, styles=["k", "r--"], pos=(4, 0)) # connect the blocks bd.connect(demand, sum[0], scope[1]) bd.connect(plant, sum[1]) bd.connect(sum, gain) bd.connect(gain, plant) bd.connect(plant, scope[0]) bd.compile() # check the diagram bd.report() # list all blocks and wires bd.run(5, debug=True) # from pathlib import Path # exec(open(Path(__file__).parent.absolute() / "test_blockdiagram.py").read())