import os
from pathlib import Path
import sys
import importlib
import inspect
from collections import Counter, namedtuple
import argparse
import types
import warnings
import time
from bdsim.blockdiagram import BlockDiagram
from bdsim.components import OptionsBase, Block, Clock, BDStruct, Plug, clocklist
import spatialmath.base as smb
import tempfile
import subprocess
import webbrowser
import traceback
import numpy as np
import scipy.integrate as integrate
import matplotlib.pyplot as plt
import re
from colored import fg, attr
try:
from progress.bar import FillingCirclesBar
_FillingCirclesBar = True
except ImportError:
_FillingCirclesBar = False
class Progress:
# print a progress bar
# https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console
@staticmethod
def printProgressBar(
fraction, prefix="", suffix="", decimals=1, length=50, fill="█", printEnd="\r"
):
percent = ("{0:." + str(decimals) + "f}").format(fraction * 100)
filledLength = int(length * fraction)
bar = fill * filledLength + "-" * (length - filledLength)
print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd)
def __init__(self, enable=True):
self.enable = enable
self.length = 60
if not enable:
return
def start(self, T):
self.T = T
if not self.enable:
return
if _FillingCirclesBar:
self.bar = FillingCirclesBar(
"bdsim", max=100, suffix="%(percent).1f%% - %(eta)ds"
)
else:
self.printProgressBar(
0, prefix="Progress:", suffix="complete", length=self.length
)
def end(self):
"""
Clean up progress bar
"""
if not self.enable:
return
if _FillingCirclesBar:
self.bar.finish()
else:
print("\r" + " " * (self.length + 20) + "\r")
def update(self, t):
"""
Update progress bar
:param t: current simulation time, defaults to None
:type t: float, optional
Update progress bar as a percentage of the maximum simulation time,
given as an argument to ``run``.
:seealso: :meth:`run` :meth:`progress_done`
"""
if not self.enable:
return
if _FillingCirclesBar:
self.bar.goto(round(t / self.T * 100))
else:
self.printProgressBar(
t / self.T, prefix="Progress:", suffix="complete", length=self.length
)
class TimeQ:
"""
Time-ordered queue for events
The list comprises tuples of (time, block) to reflect an event associated
with the specified block at the specified time.
The list is not ordered, and is sorted on a pop event.
"""
def __init__(self):
self.q = []
self.dirty = False
def __len__(self):
"""
Length of time-ordered queue
:return: number of items in the queue
:rtype: int
"""
return len(self.q)
def __str__(self):
"""
String representation of time-ordered queue
:return: show length and first item
:rtype: str
"""
if len(self) == 0:
return f"TimeQ: len={len(self)}"
else:
return f"TimeQ: len={len(self)}, first out {self.q[0]}"
def __repr__(self):
events = []
for t in self.q:
events.append(str(t))
return "\n".join(events)
def push(self, value):
"""
Push value onto time-ordered queue
:param value: tuple (time, block)
:type value: tuple
Push a block and a time onto the queue.
"""
self.q.append(value)
self.dirty = True
def pop(self, dt=0):
"""
Pop nearest items from the time-ordered queue
:param dt: time window, defaults to 0
:type dt: float, optional
:return: time of first block in queue and a list of blocks within the time window
:rtype: float, list
The next block is popped from the queue and all blocks in the time
window, that occur no more than ``dt`` later, are also popped.
"""
if len(self) == 0:
return None, []
if self.dirty:
self.q.sort(key=lambda x: x[0])
self.dirty = False
qfirst = self.q.pop(0)
t = qfirst[0]
blocks = [qfirst[1]]
while len(self.q) > 0 and self.q[0][0] < (t + dt):
blocks.append(self.q.pop(0)[1])
return t, blocks
def pop_until(self, t):
"""
Pop nearest items from time-ordered queue
:param t: time
:type t: float
:return: list of blocks remaining sorted by receding time
:rtype: list
Pops all items with time less than or equal to ``t``.
"""
if len(self) == 0:
return []
if self.dirty:
self.q.sort(key=lambda x: x[0])
self.dirty = False
i = 0
while True:
if self.q[i][0] > t:
out = self.q[:i]
self.q = self.q[i:]
return out
i += 1
# convert class name to BLOCK name
# strip underscores and capitalize
def blockname(name):
return name.upper()
[docs]class BDSimState:
"""
:ivar x: state vector
:vartype x: np.ndarray
:ivar T: maximum simulation time (seconds)
:vartype T: float
:ivar t: current simulation time (seconds)
:vartype t: float
:ivar fignum: number of next matplotlib figure to create
:vartype fignum: int
:ivar stop: reference to block wanting to stop simulation, else None
:vartype stop: Block subclass
:ivar checkfinite: halt simulation if any wire has inf or nan
:vartype checkfinite: bool
:ivar graphics: enable graphics
:vartype graphics: bool
"""
[docs] def __init__(self):
self.x = None # continuous state vector numpy.ndarray
self.T = None # maximum.BlockDiagram time
self.t = None # current time
self.fignum = 0
self.stop = None
self.checkfinite = True
self.debugger = True
self.t_stop = None # time-based breakpoint
self.eventq = TimeQ()
[docs] def declare_event(self, block, t):
self.eventq.push((t, block))
[docs]class BDSim:
_blocklibrary = None
[docs] def __init__(self, banner=True, packages=None, load=True, toolboxes=True, **kwargs):
"""
:param banner: display docstring banner, defaults to True
:type banner: bool, optional
:param packages: colon-separated list of folders to search for blocks
:type packages: str
:param load: dynamically load blocks from libraries, defaults to True
:type load: bool,optional
:param sysargs: process options from sys.argv, defaults to True
:type sysargs: bool, optional
:param graphics: enable graphics, defaults to True
:type graphics: bool, optional
:param animation: enable animation, defaults to False
:type animation: bool, optional
:param progress: enable progress bar, defaults to True
:type progress: bool, optional
:param debug: debug options, defaults to None
:type debug: str, optional
:param backend: matplotlib backend, defaults to 'Qt5Agg''
:type backend: str, optional
:param tiles: figure tile layout on monitor, defaults to '3x4'
:type tiles: str, optional
:raises ImportError: syntax error in block
:return: parent object for blockdiagram simulation
:rtype: BDSim
If ``sysargs`` is True, process command line arguments and passed
options. Command line arguments have precedence.
=================== ========= ======== ===========================================
Command line switch Argument Default Behaviour
=================== ========= ======== ===========================================
--graphics, +g graphics True enable graphical display
--animation, +a animation True update graphics at each time step
--hold, +h hold True hold graphics in done()
--no-graphics, -g graphics True disable graphical display
--no-animation, -a animation True don't update graphics at each time step
--no-hold, -H hold True do not hold graphics in done()
--no-progress, -p progress True do not display simulation progress bar
--backend BE backend 'Qt5Agg' matplotlib backend
--tiles RxC, -t RxC tiles '3x4' arrangement of figure tiles on the display
--shape WxH shape None window size, default matplotlib size
--altscreen, +A, altscreen True display plots on second monitor
--no-altscreen, -A altscreen True do not display plots on second monitor
--debug F, -d F debug '' debug flag string
--simtime T[,dt] simtime (10,) simulation time
--verbose, -v verbose False be verbose
--quiet, -q quiet False suppress reports
-o outfile None output pickled simulation results to bd.out
--out OUTFILE outfile None file to save pickled simulation results
--set P, -s P setparam [] override block parameter using ``P=block:param=value``
--global G setglob [] override global parameter using ``G=var=value``
=================== ========= ======== ===========================================
.. note:: ``animation`` and ``graphics`` options are coupled. If
``graphics=False``, all graphics is suppressed. If
``graphics=True`` then graphics are shown and the behaviour depends
on ``animation``. ``animation=False`` shows graphs at the end of
the simulation, while ``animation=True` will animate the graphs
during simulation.
:seealso: :meth:`set_globals()`
"""
self.packages = packages
# process command line and overall options
self.options = Options(**kwargs)
# print docstring as a startup banner
if banner and not self.options.quiet:
calling_frame = inspect.currentframe().f_back
try:
doc = calling_frame.f_locals["__doc__"]
if doc is not None:
for line in doc.strip().split("\n"):
print("* " + line)
except KeyError:
pass
# load modules from the blocks folder
if BDSim._blocklibrary is None and load:
BDSim._blocklibrary = self.load_blocks(
self.options.verbose, toolboxes=toolboxes
)
if self.options.blocks:
self.blocks()
[docs] def blockinfo(self, block=None):
"""Return info about all blocks
:param block: name of block to return info for, otherwise list of info for all
:type block: str, optional
:returns: parameters of blocks
:rtype: dict or list of dicts
Detailed metadata about a block is obtained by introspection and parsing the block's docstring.
========== =====================================================
Key Description
========== =====================================================
path Path to the folder containing block definition
classname Name of class
url URL of online documentation
class Reference to the class
module Name of the module package.blocks.module
package Name of the package, eg. bdsim, roboticstoolbox
params Dict of (type, descrip), indexed by parameter name
inputs List of names of block inputs
outputs List of names of block outputs
nin Number of inputs, -1 if variable
nout Number of outputs, -1 if variable
blockclass Block class, eg. source, sink etc.
========== =====================================================
"""
if block is None:
return self._blocklibrary
else:
return self._blocklibrary[block]
def __str__(self):
"""
String representation of simulation
:return: single line summary of simulation environment
:rtype: str
"""
s = f"BDSim: {len(self._blocklibrary)} blocks in library\n"
return s
def __repr__(self):
s = (
f"Block diagram simulation runtime, {len(self._blocklibrary)} blocks"
" imported to library.\n"
)
s += "simulation options:\n"
for k, v in self.state.options.items():
s += " {:s}: {}\n".format(k, v)
return s
[docs] def run(
self,
bd,
T=5,
dt=None,
solver="RK45",
solver_args={},
debug="",
block=None,
checkfinite=True,
minstepsize=1e-12,
watch=[],
):
"""
Run the block diagram
:param T: maximum integration time, defaults to 10.0
:type T: float, optional
:param dt: maximum time step
:type dt: float, optional
:param solver: integration method, defaults to ``RK45``
:type solver: str, optional
:param block: matplotlib block at end of run, default False
:type block: bool
:param checkfinite: error if inf or nan on any wire, default True
:type checkfinite: bool
:param minstepsize: minimum step length, default 1e-6
:type minstepsize: float
:param watch: list of input ports to log
:type watch: list
:param solver_args: arguments passed to ``scipy.integrate``
:type solver_args: dict
:return: time history of signals and states
:rtype: Sim class
Assumes that the network has been compiled.
The system is simulated from time 0 to ``T``.
The integration step time ``dt`` defaults to ``T/100`` but can be
specified. Finer control can be achieved using ``max_step`` and
``first_step`` parameters to the underlying integrator using the
``solver_args`` parameter.
Results are returned in a class with attributes:
- ``t`` the time vector: ndarray, shape=(M,)
- ``x`` is the state vector: ndarray, shape=(M,N)
- ``xnames`` is a list of the names of the states corresponding to columns of `x`, eg. "plant.x0",
defined for the block using the ``snames`` argument
- ``yN`` for a watched input where N is the index of the port mentioned in the ``watch`` argument
- ``ynames`` is a list of the names of the input ports being watched, same order as in ``watch`` argument
If there are no dynamic elements in the diagram, ie. no states, then ``x`` and ``xnames`` are not
present.
The ``watch`` argument is a list of one or more input ports whose value during simulation
will be recorded. The elements of the list can be:
- a ``Block`` reference, which is interpretted as input port 0
- a ``Plug`` reference, ie. a block with an index or attribute
- a string of the form "block[i]" which is port i of the block named block.
The debug string comprises single letter flags:
- 'p' debug network value propagation
- 's' debug state vector
- 'd' debug state derivative
.. note:: Simulation stops if the step time falls below ``minsteplength``
which typically indicates that the solver is struggling with a very
harsh non-linearity.
"""
assert bd.compiled, "Network has not been compiled"
# get simulation time
# --simtime=T or --simtime=T,dt
if self.options.simtime is not None:
try:
default_times = eval(self.options.simtime)
if isinstance(default_times, (int, float)):
T = default_times
elif isinstance(default_times, tuple):
T, dt = default_times
else:
raise ValueError(
"bad simtime option passed " + self.options.simtime
)
except:
raise ValueError("bad simtime option passed " + self.options.simtime)
# final default values
# T = T or 5
# dt = dt or 0.01
simstate = BDSimState()
self.simstate = simstate
simstate.T = T
if dt is None and not "max_step" in solver_args:
dt = T / 100
simstate.dt = dt
simstate.count = 0
simstate.bdtime = 0.0
simstate.gtime = 0.0 # last graphics update
simstate.solver = solver
simstate.solver_args = solver_args
simstate.minstepsize = minstepsize
simstate.stop = None # allow any block to stop.BlockDiagram by setting this to the block's name
simstate.checkfinite = checkfinite
# state.options = copy.copy(self.options)
simstate.options = self.options
self.bd = bd
simstate.t_stop = None
if debug:
# append debug flags
if debug not in simstate.options.debug:
simstate.options.debug += debug
# turn off progress bar if any debug options are given
if len(simstate.options.debug) > 0:
self.options.progress = False
if block is not None:
self.options.hold = block
# process the watchlist
# elements can be:
# - block or Plug reference
# - str in the form BLOCKNAME[PORT]
watchlist = []
watchnamelist = []
re_block = re.compile(r"(?P<name>[^[]+)(\[(?P<port>[0-9]+)\])?")
for w in watch:
if isinstance(w, str):
# a name was given, with optional port number
m = re_block.match(w)
if m is None:
raise ValueError("watch block[port] not found: " + w)
name = m.group("name")
# get optional port number
port = m.group("port")
if port is None:
port = 0
else:
port = int(port)
b = bd.blocknames[name]
plug = b[port]
elif isinstance(w, Block):
# a block was given, defaults to port 0
plug = w[0]
elif isinstance(w, Plug):
# a plug was given
plug = w
watchlist.append(plug)
watchnamelist.append(str(plug))
simstate.watchlist = watchlist
simstate.watchnamelist = watchnamelist
x0 = bd.getstate0()
if not self.options.quiet:
print(fg("yellow"))
print(f">>> Start simulation: T = {T}, dt = {dt}")
print(f" Continuous state variables: {bd.nstates}")
print(" x0 = ", x0)
print(f" Discrete state variables: {bd.ndstates}")
# get the number of discrete states from all clocks
ndstates = 0
for clock in bd.clocklist:
nds = 0
for b in clock.blocklist:
nds += b.ndstates
ndstates += nds
if not self.options.quiet:
print(f" {clock.name}: x0 = ", clock.getstate0())
if not self.options.quiet:
print(attr(0))
# update block parameters given on command line
self.update_parameters(bd)
# tell all blocks we're starting a BlockDiagram
self.bd.start(simstate)
# initialize list of time and states
simstate.tlist = []
simstate.xlist = []
simstate.plist = [[] for p in simstate.watchlist]
self.progress = Progress(enable=self.options.progress)
self.progress.start(T)
if len(simstate.eventq) == 0:
# no simulation events, solve it in one go
self.run_interval(bd, 0, T, x0, simstate=simstate)
nintervals = 1
else:
# we have simulation events, solve it in chunks
simstate.declare_event(None, T) # add an event at end of simulation
# ignore all the events at zero
tprev = 0
simstate.eventq.pop_until(tprev)
# get the state vector
x = x0
nintervals = 0
while True:
# get next event from the queue and the list of blocks or
# clocks at that time
tnext, sources = simstate.eventq.pop(dt=1e-6)
if tnext is None:
break
# run system until next event time
x = self.run_interval(bd, tprev, tnext, x, simstate=simstate)
nintervals += 1
# visit all the blocks and clocks that have an event now
for source in sources:
if isinstance(source, Clock):
# clock ticked, save its state
source.savestate(tnext)
source.next_event(self.simstate)
# get the new state
source._x = source.getstate(tnext)
tprev = tnext
# are we done?
if simstate.t is not None and simstate.t >= T:
break
# finished integration
self.progress.end() # cleanup the progress bar
# print some info about the integration
if not self.options.quiet:
print(fg("yellow"))
print("<<< Simulation complete")
print(f" block diagram evaluations: {simstate.count}")
print(
" block diagram exec time: "
f" {simstate.bdtime / simstate.count * 1000.0:.3f} ms"
)
print(f" time steps: {len(simstate.tlist)}")
print(f" integration intervals: {nintervals}")
print(attr(0))
# save buffered data in a Struct
out = BDStruct(name="results")
out.t = np.array(simstate.tlist)
out.x = np.array(simstate.xlist)
out.xnames = bd.statenames
# save clocked states
for c in bd.clocklist:
name = c.name.replace(".", "")
clockdata = BDStruct(name)
clockdata.t = np.array(c.t)
clockdata.x = np.array(c.x)
out.add(name, clockdata)
# save the watchlist into variables named y0, y1 etc.
for i, p in enumerate(watchlist):
out["y" + str(i)] = np.array(simstate.plist[i])
out.ynames = watchnamelist
# the command line options -o or --out saves results as a pickle file
# -o defaults to bd.out
# --out FILE allows the filename to be specified
#
# we can visualize the output file by
#
# % python -mpickle bd.out
# t = ndarray:float64 (123,)
# x = ndarray:float64 (123, 1)
# xnames = ['plantx0'] (list)
# ynames = [] (list)
if self.options.outfile is not None:
out.dump(self.options.outfile)
if not self.options.quiet:
print("simulation results pickled --> ", self.options.outfile)
# pause until all graphics blocks close
if self.options.graphics and self.options.hold:
self.done(self.bd, block=self.options.hold)
return out
[docs] def update_parameters(self, bd):
"""
Set value of parameters according to command line arguments
Command line arguments of the form:
``-s block:param=value``
``--set block:param=value``
are stored as list items in ``options.setparam``
``block`` can be either:
- the block's name as a string, either user assigned or bdsim assigned
- the block ``id`` as displayed by the ``report`` method
``param`` is the name of the parameter used in the constructor
``value`` is the new value of the variable
"""
re_set = re.compile(r"(?P<block>[\w\.]+):(?P<param>[\w]+)=(?P<value>.*)")
for s in self.options.setparam:
m = re_set.match(s)
if m is None:
raise ValueError("bad set parameter: " + s)
# get block reference
blockname = m["block"]
try:
blockname = int(blockname)
except ValueError:
pass
block = bd[blockname]
param = m["param"]
try:
prev_value = getattr(block, param)
except ValueError:
raise ValueError(f"block {block.name} has no parameter '{param}'")
# get the parameter
value = m["value"]
new_value = None
try:
if ";" in value:
new_value = smb.str2array(value)
else:
try:
new_value = int(value)
except ValueError:
new_value = float(value)
except ValueError:
raise ValueError("cannot parse value " + value)
# change the value
setattr(block, param, new_value)
print(
f"changed value of {block.name}:{param} from {prev_value} ->"
f" {new_value}"
)
[docs] def run_interval(self, bd, t0, T, x0, simstate=None):
"""
Integrate system over interval
:param bd: the system blockdiagram
:type bd: BlockDiagram
:param t0: initial time
:type t0: float
:param tf: final time
:type tf: float
:param x0: initial state vector
:type x0: ndarray(n)
:param simstate: simulation state object
:type simstate: SimState
:return: final state vector xf
:rtype: ndarray(n)
The system is integrated from from ``x0`` to ``xf`` over the interval ``t0`` to ``tf``.
"""
try:
if bd.nstates > 0:
# system has continuous states, solve it using numerical integration
# print('initial state x0 = ', x0)
# block diagram contains states, solve it using numerical integration
scipy_integrator = integrate.__dict__[
simstate.solver
] # get user specified integrator
def ydot(t, y):
simstate.t = t
simstate.count += 1
t0 = time.time()
yd = bd.schedule_evaluate(y, t, sinks=False, simstate=simstate)
t1 = time.time()
simstate.bdtime += t1 - t0
return yd
if simstate.dt is not None:
simstate.solver_args["max_step"] = simstate.dt
# print(f"run interval: from {t0} to {t0+T}, args={state.solver_args}, x0={x0}")
integrator = scipy_integrator(
ydot, t0=t0, y0=x0, t_bound=T, **simstate.solver_args
)
# integrate
while integrator.status == "running":
# step the integrator, calls _deriv and evaluate block diagram multiple times
message = integrator.step()
if integrator.status == "failed":
print(
fg("red")
+ f"\nintegration completed with failed status: {message}"
+ attr(0)
)
break
# stash the results
simstate.t = integrator.t
simstate.tlist.append(integrator.t)
simstate.xlist.append(integrator.y)
# record the ports on the watchlist
for i, p in enumerate(simstate.watchlist):
b = p.block
out = b.output(integrator.t, b.inputs, b._x)[p.port]
simstate.plist[i].append(out)
# update all blocks that need to know
if (integrator.t - simstate.gtime) > (simstate.T / 200):
bd.step(integrator.t)
simstate.gtime = integrator.t
# bd.step(integrator.t)
self.progress.update(simstate.t) # update the progress bar
if integrator.status == "finished":
break
# has any block called a stop?
if simstate.stop is not None:
print(
fg("red") + f"\n--- stop requested at t={simstate.t:.4f} by"
f" {simstate.stop}" + attr(0)
)
break
if (
simstate.minstepsize is not None
and integrator.step_size < simstate.minstepsize
):
print(
fg("red") + "\n--- stopping on minimum step size at"
f" t={simstate.t:.4f} with last stepsize"
f" {integrator.step_size:g}" + attr(0)
)
break
if "i" in simstate.options.debug:
bd._debugger(simstate, integrator)
return integrator.y # return final state vector
elif len(clocklist) == 0:
# block diagram has no continuous or discrete states
assert simstate.dt is not None, "if no states must specify dt"
for t in np.arange(t0, T, simstate.dt): # step through the time range
# evaluate the block diagram
simstate.t = t
simstate.count += 1
t0 = time.time()
bd.schedule_evaluate([], t)
t1 = time.time()
simstate.bdtime += t1 - t0
# stash the results
simstate.tlist.append(t)
# record the ports on the watchlist
for i, p in enumerate(simstate.watchlist):
b = p.block
out = b.output(integrator.t, b.inputs, b._x)[p.port]
simstate.plist[i].append(out)
# update all blocks that need to know
bd.step(t)
self.progress.update(t) # update the progress bar
# has any block called a stop?
if simstate.stop is not None:
print(
fg("red") + f"\n--- stop requested at t={simstate.t:.4f} by"
f" {simstate.stop}" + attr(0)
)
break
if "i" in simstate.options.debug:
bd._debugger(simstate, integrator)
else:
# block diagram has no continuous states
t = t0
simstate.t = t
# evaluate the block diagram
simstate.count += 1
t0 = time.time()
bd.schedule_evaluate([], t)
t1 = time.time()
simstate.bdtime += t1 - t0
# stash the results
simstate.tlist.append(t)
# record the ports on the watchlist
for i, p in enumerate(simstate.watchlist):
b = p.block
out = b.output(integrator.t, b.inputs, b._x)[p.port]
simstate.plist[i].append(out)
# update all blocks that need to know
if (t - simstate.gtime) > (simstate.T / 200):
bd.step(t)
simstate.gtime = t
# bd.step(t)
self.progress.update(simstate.t) # update the progress bar
# has any block called a stop?
if simstate.stop is not None:
print(
fg("red") + f"\n--- stop requested at t={simstate.t:.4f} by"
f" {simstate.stop}" + attr(0)
)
if "i" in simstate.options.debug:
bd._debugger(simstate)
except RuntimeError as err:
# bad things happens, print a message and return no result
print("unrecoverable error in evaluation: ", err)
raise
[docs] def blockdiagram(self, name="main") -> BlockDiagram:
"""
Instantiate a new block diagram object.
:param name: diagram name, defaults to 'main'
:type name: str, optional
:return: parent object for blockdiagram
:rtype: BlockDiagram
This object describes the connectivity of a set of blocks and wires.
It is an instantiation of the ``BlockDiagram`` class with a factory
method for every dynamically loaded block which returns
an instance of the block. These factory methods have names
which are all upper case, for example, the method ``.GAIN`` invokes
the constructor for the ``Gain`` class.
:seealso: :func:`BlockDiagram`
"""
# instantiate a new blockdiagram
bd = BlockDiagram(name=name)
def new_method(cls, bd):
# return a wrapper for the block constructor that automatically
# adds the block to the diagram's blocklist
def block_init_wrapper(self, *args, **kwargs):
block = cls(*args, bd=bd, **kwargs) # call __init__ on the block
return block
# return a function that invokes the class constructor
f = block_init_wrapper
# move the __init__ docstring to the class to allow BLOCK.__doc__
f.__doc__ = cls.__init__.__doc__
return f
# bind the block constructors as new methods on this instance
self.blockdict = {}
for blockname, info in self._blocklibrary.items():
# create a function to invoke the block's constructor
f = new_method(info["class"], bd)
# set a bound version of this function as an attribute of the instance
# method = types.MethodType(new_method, bd)
# setattr(bd, block.name, method)
setattr(bd, blockname, f.__get__(self))
# add a clone of the options
# bd.options = copy.copy(self.options)
bd.runtime = self
return bd
[docs] def DEBUG(self, debug, fmt, *args):
if debug[0] in self.options.debug:
print(f"DEBUG.{debug:s}: " + fmt.format(*args))
[docs] def done(self, bd, block=False):
if self.options.hold:
block = self.options.hold
try:
plt.show(block=block)
except KeyboardInterrupt:
print("bdsim: closing all windows")
plt.close("all")
# sys.exit(1) # not sure why we have this
return
bd.done()
plt.close("all")
plt.pause(0.5) # let the event handler do its work
[docs] def closefigs(self):
for i in range(self.simstate.fignum):
print("close", i + 1)
plt.close(i + 1)
plt.pause(0.1)
self.simstate.fignum = 0 # reset figure counter
[docs] def savefig(self, block, filename=None, format="pdf", **kwargs):
block.savefig(filename=filename, format=format, **kwargs)
[docs] def savefigs(self, bd, format="pdf", **kwargs):
from bdsim.graphics import GraphicsBlock
for b in bd.blocklist:
if isinstance(b, GraphicsBlock):
b.savefig(filename=b.name, format=format, **kwargs)
[docs] def showgraph(self, bd, **kwargs):
# create the temporary dotfile
dotfile = tempfile.TemporaryFile(mode="w")
bd.dotfile(dotfile, **kwargs)
# rewind the dot file, create PDF file in the filesystem, run dot
dotfile.seek(0)
pdffile = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
subprocess.run("dot -Tpdf", shell=True, stdin=dotfile, stdout=pdffile)
# open the PDF file in browser (hopefully portable), then cleanup
webbrowser.open(f"file://{pdffile.name}")
os.remove(pdffile.name)
[docs] def fatal(self, message, retval=1):
"""
Fatal simulation error
:param message: Error message
:type message: str
:param retval: system return value (*nix only) defaults to 1
:type retval: int, optional
Display the error message then terminate the process. For operating
systems that support it, return an integer code.
"""
# TODO print text in some color
print(message)
sys.exit(retval)
[docs] def load_blocks(self, verbose=True, toolboxes=True):
"""
Dynamically load all block definitions.
:raises ImportError: module could not be imported
:return: dictionary of block metadata
:rtype: dict of dict
Reads blocks from .py files found in bdsim/bdsim/blocks, folders
given by colon separated list in envariable BDSIMPATH, and the
command line option ``packages``.
The result is a dict indexed by the upper-case block name with elements:
- ``path`` to the folder holding the Python file defining the block
- ``classname``
- ``blockname``, upper case version of ``classname``
- ``url`` of online documentation for the block
- ``package`` containing the block
- `doc` is the docstring from the class constructor
"""
def parse_docstring(ds):
# this should have two versions: sphinx, numpy doc styles
import re
from collections import OrderedDict
re_isfield = re.compile(r"\s*:[a-zA-Zα-ωΑ-Ω0-9_ ]+:")
re_field = re.compile(
r"^\s*:(?P<field>[a-zA-Z]+)(?:"
r" +(?P<var>[a-zA-Zα-ωΑ-Ω0-9_]+))?:(?P<body>.+)$"
)
# a-zA-Zα-ωΑ-Ω0-9_
def indent(s):
return len(s) - len(s.lstrip())
fieldnames = ("param", "type", "input", "output")
excludevars = ("kwargs", "inputs")
# parse out all lines of the form:
#
# :field var: body
# or
# :field var: body with a very long description that
# carries over to another line or two
fieldlines = []
for para in ds.split("\n\n"):
# print(para)
# print('--')
indent_prev = None
infield = False
for line in para.split("\n"):
if len(line) == 0:
continue
if indent_prev is None:
indent_prev = indent(line)
if re_isfield.match(line) is not None:
fieldlines.append(line.lstrip())
infield = True
if indent(line) > indent_prev and infield:
fieldlines[-1] += " " + line.lstrip()
if indent(line) == indent_prev:
infield = False
# fieldlines is a list of lines of the form
#
# :field var: body
#
# where extension lines have been concatenated
# create a dict of dicts
#
# dict[field][var] -> body
dict = OrderedDict()
for line in fieldlines:
m = re_field.match(line)
if m is not None:
field, var, body = m.groups()
if var in excludevars or field not in fieldnames:
continue
if field not in dict:
dict[field] = {var: body}
else:
dict[field][var] = body
dict[m.group("field")]
# now connect pairs of lines of the form
#
# :param X: param description
# :type X: type description
#
# params[X] = (type description, param description)
params = {}
if "param" in dict:
for var, descrip in dict["param"].items():
typ = dict["type"].get(var, None)
params[var] = (typ, descrip)
return params
block = namedtuple("block", "name, cls, path")
if toolboxes:
packages = ["bdsim", "roboticstoolbox", "machinevisiontoolbox"]
else:
packages = ["bdsim"]
env = os.getenv("BDSIMPATH")
if env is not None:
packages += env.split(":")
if self.packages is not None:
packages += self.packages.split(":")
blocks = {}
moduledicts = {}
for package in packages:
try:
spec = importlib.util.find_spec(".blocks", package=package)
if spec is None:
print(f"package {package} has no blocks module")
continue
pkg = spec.loader.load_module()
except ModuleNotFoundError:
print(f"package {package} not found")
continue
except ImportError:
print(f"package {package} load error, continuing")
import textwrap
print(textwrap.indent(traceback.format_exc(), " "))
continue
moduledict = {}
for name, value in pkg.__dict__.items():
# check if it's a valid block class
if not inspect.isclass(value):
continue
if Block not in inspect.getmro(value):
continue
if name.endswith("Block"):
continue
if value.blockclass in ("source", "transfer", "function"):
# must have an output function
valid = (
hasattr(value, "output")
and callable(value.output)
and len(inspect.signature(value.output).parameters) == 4
)
if not valid:
print(
"block {:s} has missing/improper output method".format(
value.__name__
)
)
continue
if value.blockclass == "sink":
# must have a step function with at least one
# parameter: step(self [,state])
valid = (
hasattr(value, "step")
and callable(value.step)
and len(inspect.signature(value.step).parameters) == 3
)
if not valid:
print(
"block {:s} has missing/improper step method".format(
value.__name__
)
)
continue
# add it to the dict of blocks indexed by module
if value.__module__ in moduledict:
moduledict[value.__module__].append(name)
else:
moduledict[value.__module__] = [name]
# create a dict for the block with metadata
block_info = {}
block_info[
"path"
] = pkg.__path__ # path to folder holding block definition
block_info["classname"] = name
block_info["blockname"] = blockname(name)
try:
block_info["url"] = (
pkg.__dict__["url"] + "#" + block.__module__ + "." + name
)
except KeyError:
block_info["url"] = None
block_info["class"] = value
block_info["module"] = value.__module__
block_info["package"] = package
# get the docstring from the class and the constructor
ds = ""
if value.__doc__ is not None:
ds += value.__doc__
if value.__init__.__doc__ is not None:
ds += value.__init__.__doc__
if ds is None:
raise ValueError("block has no docstring")
block_info["doc"] = ds
param_dict = parse_docstring(ds)
block_info["params"] = param_dict
# now add all the other stuff we know about the block
block_info["inputs"] = param_dict.get("input")
block_info["outputs"] = param_dict.get("output")
block_info["nin"] = value.nin
block_info["nout"] = value.nout
block_info["blockclass"] = value.__base__.__name__.lower().replace(
"block", ""
)
blocks[blockname(name)] = block_info
moduledicts[package] = moduledict
self.moduledicts = moduledicts
return blocks
[docs] def blocks(self):
"""
List all loaded blocks.
Example::
73 blocks loaded
bdsim.blocks.functions..................: Sum Prod Gain Clip Function Interpolate
bdsim.blocks.sources....................: Constant Time WaveForm Piecewise Step Ramp
bdsim.blocks.sinks......................: Print Stop Null Watch
bdsim.blocks.transfers..................: Integrator PoseIntegrator LTI_SS LTI_SISO
bdsim.blocks.discrete...................: ZOH DIntegrator DPoseIntegrator
bdsim.blocks.linalg.....................: Inverse Transpose Norm Flatten Slice2 Slice1 Det Cond
bdsim.blocks.displays...................: Scope ScopeXY ScopeXY1
bdsim.blocks.connections................: Item Dict Mux DeMux Index SubSystem InPort OutPort
roboticstoolbox.blocks.arm..............: FKine IKine Jacobian Tr2Delta Delta2Tr Point2Tr TR2T FDyn IDyn Gravload
........................................: Inertia Inertia_X FDyn_X ArmPlot Traj JTraj LSPB CTraj CirclePath
roboticstoolbox.blocks.mobile...........: Bicycle Unicycle DiffSteer VehiclePlot
roboticstoolbox.blocks.uav..............: MultiRotor MultiRotorMixer MultiRotorPlot
machinevisiontoolbox.blocks.camera......: Camera Visjac_p EstPose_p ImagePlane
"""
def dots(s, n=40):
return s + "." * (n - len(s))
print(len(self._blocklibrary), " blocks loaded")
for pkg, dict in self.moduledicts.items():
for k, v in dict.items():
s = ""
once = False
while len(v) > 0:
n = v.pop(0) + " "
if len(s + n) < 80:
s += n
continue
else:
# line will be too long
if not once:
print(f"{dots(k)}: {s}")
once = True
else:
print(f"{dots('')}: {s}")
s = ""
if len(s) > 0:
if once:
print(f"{dots('')}: {s}")
else:
print(f"{dots(k)}: {s}")
[docs] def set_options(self, **options):
self.options.set(**options)
warnings.warn("use sim.options.OPT=VALUE instead", DeprecationWarning)
[docs] def set_globals(self, globs):
"""
Set globals as specified by command line
:param globs: global variables
:type globs: dict
The command line option ``--global var=value`` can be used to request the change
of global variables. However, actually changing them requires explicit code
support in the user's program after the ``BDSim`` constructor.
Example::
sim.set_globals(globals())
Messages are displayed by defaulting, indicating which variables are changed,
and their old and new values.
"""
# handle the globals
for s in self.options.setglob:
var, value = s.split("=")
new_value = eval(value)
print(f"changed value of global {var} from {globs[var]} -> {new_value}")
globs[var] = new_value
[docs] def report(self, bd, type="summary", **kwargs):
"""Print block diagram report
:param bd: the block diagram to be reported
:type bd: :class:`BlockDiagram`
:param type: report type, one of: "summary" (default), "lists", "schedule"
:type type: str, optional
:param style: table style, one of: ansi (default), markdown, latex
:type style: str
Single method wrapper for various block diagram reports. Obeys the ``-q``
option to suppress all reports at runtime.
:seealso: :meth:`BlockDiagram.report_summary` :meth:`BlockDiagram.report_lists` :meth:`BlockDiagram.report_schedule`
"""
if self.options.quiet:
return
if type == "lists":
bd.report_lists(**kwargs)
elif type == "summary":
bd.report_summary(**kwargs)
elif type == "schedule":
bd.report_schedule(**kwargs)
class Options(OptionsBase):
def __init__(self, sysargs=True, **options):
default_options = {
"backend": None,
"tiles": "3x4",
"graphics": True,
"animation": False,
"hold": True,
"shape": None,
"altscreen": True,
"progress": True,
"verbose": False,
"debug": "",
"simtime": None,
"blocks": False,
"outfile": None,
"quiet": False,
"setparam": [],
"setglob": [],
}
# modify defaults according to envariable BDSIM which is comma/semicolon
# separated list of key=value pairs
# eg. setenv BDSIM graphics=True,hold=True
env = os.getenv("BDSIM")
if env is not None:
for key_value in env.split(",;"):
# for each key=value pair
key, value = [s.strip() for s in key_value.split("=")]
# attempt an eval, resolves True, False
try:
value = eval(value)
except SyntaxError:
pass
try:
default_options[key] = value
except KeyError:
print("envariable BDSIM, unknown option", key)
if sysargs:
# command line arguments and graphics
parser = argparse.ArgumentParser(
prefix_chars="-+",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="Block diagram simulation framework",
epilog=(
"set defaults using environment variable BDSIM as a single string"
" containing command line options"
),
)
parser.add_argument(
"--backend",
"-b",
type=str,
metavar="BACKEND",
help="matplotlib backend to choose",
)
parser.add_argument(
"--tiles",
"-t",
type=str,
metavar="ROWSxCOLS",
help="window tiling as NxM",
)
parser.add_argument(
"--shape",
type=str,
metavar="WIDTHxHEIGHT",
help="window size as WxH, defaults to matplotlib default",
)
parser.add_argument(
"--blocks",
action="store_const",
const=True,
default=False,
dest="blocks",
help="Display blocks at startup",
)
parser.add_argument(
"-g",
"--no-graphics",
action="store_const",
const=False,
dest="graphics",
help="disable graphic display, also does --no-animation",
)
parser.add_argument(
"+g",
"--graphics",
action="store_const",
const=True,
dest="graphics",
help="enable graphic display",
)
parser.add_argument(
"-a",
"--no-animation",
action="store_const",
const=False,
dest="animation",
help="do not animate graphics",
)
parser.add_argument(
"+a",
"--animation",
action="store_const",
const=True,
dest="animation",
help="animate graphics, also does ++graphics",
)
parser.add_argument(
"-H",
"--no-hold",
action="store_const",
const=False,
dest="hold",
help="do not hold graphics in done()",
)
parser.add_argument(
"+H",
"--hold",
action="store_const",
const=True,
dest="hold",
help="hold graphics in done()",
)
parser.add_argument(
"+A",
"--altscreen",
action="store_const",
const=True,
dest="altscreen",
help="display plots on second monitor",
)
parser.add_argument(
"-A",
"--no-altscreen",
action="store_const",
const=False,
dest="altscreen",
help="do not display plots on second monitor",
)
parser.add_argument(
"--no-progress",
"-p",
action="store_const",
const=False,
dest="progress",
help="animate graphics",
)
parser.add_argument(
"--verbose", "-v", action="store_const", const=True, help="debug flags"
)
parser.add_argument(
"--debug",
"-d",
type=str,
metavar="[psd]",
help="debug flags: p/ropagate, s/tate, d/eriv, i/nteractive",
)
parser.add_argument(
"--simtime", "-S", type=str, help="simulation time: T or T,dt"
)
parser.add_argument(
"--quiet",
"-q",
action="store_const",
const=True,
help="suppress reports",
)
parser.add_argument(
"-o",
action="store_const",
const="bd.out",
dest="outfile",
help="output pickled simulation results to bd.out",
)
parser.add_argument(
"--out",
type=str,
dest="outfile",
help="file to save pickled simulation results",
)
parser.add_argument(
"--set",
"-s",
dest="setparam",
action="append",
type=str,
help="override block parameter using block:param=value",
)
parser.add_argument(
"--global",
dest="setglob",
action="append",
type=str,
help="override global parameter using var=value",
)
args, unknownargs = parser.parse_known_args()
cmdline_options = vars(args) # get args as a dictionary
# keep only the options that are not None, ie. those that were
# explicitly set on the command line
cmdline_options = {
option: value
for option, value in cmdline_options.items()
if value is not None
}
if "graphics" in cmdline_options:
# -g or +g present
if not cmdline_options["graphics"]:
# -g then disable animation
cmdline_options["animation"] = False
elif "animation" in cmdline_options and cmdline_options["animation"]:
# +a present
cmdline_options["graphics"] = True
else:
cmdline_options = dict() # empty dictionary
super().__init__(readonly=cmdline_options, args=default_options)
# now handle the passed options
self.set(**options)
if self.verbose:
print(self)
self._argv = unknownargs # save non-bdsim arguments
def sanity(self, options):
# ensure graphics is enabled if animation is requested
# ensure animation is disabled if graphics is disabled
if "graphics" in options and "animation" in options:
if options["animation"] and not options["graphics"]:
raise ValueError("cannot enable animation but disable graphics")
elif "graphics" in options and not options["graphics"]:
options["animation"] = False
elif "animation" in options and options["animation"]:
options["graphics"] = True
return options