"""
Transfer blocks:
- have inputs and outputs
- have state variables
- are a subclass of ``TransferBlock`` |rarr| ``Block``
"""
import numpy as np
import scipy.signal
import math
from math import sin, cos, atan2, sqrt, pi
import matplotlib.pyplot as plt
from spatialmath import base
from bdsim.components import TransferBlock, SubsystemBlock
[docs]
class Integrator(TransferBlock):
r"""
:blockname:`INTEGRATOR`
Continuous-time integrator.
:inputs: 1
:outputs: 1
:states: N
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- float, ndarray
- :math:`x`
* - Output
- 0
- any
- :math:`y`
Output is the time integral of the input :math:`y(t) = \int_0^T x(t) dt`.
The state can be a scalar or a vector. The initial state, and type, is given by
``x0``. The shape of the input signal must match ``x0``.
The minimum and maximum values can be:
- a scalar, in which case the same value applies to every element of
the state vector, or
- a vector, of the same shape as ``x0`` that applies elementwise to
the state.
.. note:: The minimum and maximum prevent integration outside the limits,
but assume that the initial state is within the limits.
Integration can be controlled by an ``enable`` function::
enable(t, u, x): bool
where the arguments are current time, a list of inputs to the block and the state
as an ndarray. If the function returns False then the integrator's output is set
to zero.
.. todo:: Make enable an optional input to the block, or the input for a subclassed
variant of the block.
:seealso: :class:`Deriv`
"""
nin = 1
nout = 1
[docs]
def __init__(self, x0=0, gain=1.0, min=None, max=None, enable=None, **blockargs):
"""
:param x0: Initial state, defaults to 0
:type x0: array_like, optional
:param gain: gain or scaling factor, defaults to 1
:type gain: float
:param min: Minimum value of state, defaults to None
:type min: float or array_like, optional
:param max: Maximum value of state, defaults to None
:type max: float or array_like, optional
:param enable: enable or disable integration
:type enable: callable
:param blockargs: |BlockOptions|
:type blockargs: dict
"""
super().__init__(**blockargs)
if isinstance(x0, (int, float)):
x0 = np.r_[x0]
elif isinstance(x0, np.ndarray):
if x0.ndim > 1:
raise ValueError("state must be a 1D vector")
else:
x0 = base.getvector(x0)
self.nstates = x0.shape[0]
if min is not None:
min = base.getvector(min, self.nstates)
if max is not None:
max = base.getvector(max, self.nstates)
self._x0 = x0
self.min = min
self.max = max
self.gain = gain
self._x0 = x0
self.min = min
self.max = max
self.enable = enable
if enable is not None and not callable(enable):
raise ValueError("enable must be callable")
# print("nstates", self.nstates)
def output(self, t, u, x):
return [self.gain * x]
def deriv(self, t, u, x):
xd = base.getvector(u[0])
if self.enable is not None and not self.enable(t, u, x):
# if enable function returns False then integrator output is jammed at zero
self._x = np.zeros(x.shape)
return np.zeros(x.shape)
if self.min is not None:
xd[x < self.min] = 0
if self.max is not None:
xd[x > self.max] = 0
return self.gain * xd
[docs]
class PoseIntegrator(TransferBlock):
r"""
:blockname:`POSEINTEGRATOR`
Continuous-time pose integrator
:inputs: 1
:outputs: 1
:states: 6
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- ndarray(6,)
- :math:`x`
* - Output
- 0
- SE3
- :math:`y`
This block integrates spatial velocity over time. The block input is a spatial
velocity as a 6-vector :math:`(v_x, v_y, v_z, \omega_x, \omega_y, \omega_z)` and the
output is pose as an ``SE3`` instance.
.. note:: The state vector is a velocity twist.
"""
nin = 1
nout = 1
[docs]
def __init__(self, x0=None, **blockargs):
r"""
:param x0: Initial pose, defaults to null
:type x0: SE3, Twist3, optional
:param blockargs: |BlockOptions|
:type blockargs: dict
"""
super().__init__(**blockargs)
if x0 is None:
x0 = np.zeros((6,))
self.nstates = len(x0)
self._x0 = x0
def output(self, t, u, x):
return [Twist3(x).SE3(1)]
def deriv(self, t, u, x):
return u[0]
# ------------------------------------------------------------------------ #
[docs]
class LTI_SS(TransferBlock):
r"""
:blockname:`LTI_SS`
Continuous-time state-space LTI dynamics
:inputs: 1
:outputs: 1
:states: N
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- float, ndarray
- :math:`u`
* - Output
- 0
- float, ndarray
- :math:`y`
Implements the dynamics of a multi-input multi-output (MIMO) linear
time invariant (LTI) system described in statespace form. The dynamics are given by
.. math::
\dot{x} &= A x + B u
y &= C x
The order of the states in ``x0`` is consistent with controller canonical
form. A direct passthrough component, typically :math:`D`, is not allowed in order
to avoid algebraic loops.
Examples::
lti = bd.LTI_SS(A=-2, B=1, C=-1)
is the system :math:`\dot{x}=-2x+u, y=-x`.
"""
nin = 1
nout = 1
[docs]
def __init__(self, A=None, B=None, C=None, x0=None, **blockargs):
r"""
:param N: numerator coefficients, defaults to 1
:type N: array_like, optional
:param D: denominator coefficients, defaults to [1,1]
:type D: array_like, optional
:param x0: initial states, defaults to None
:type x0: array_like, optional
:param blockargs: |BlockOptions|
:type blockargs: dict
"""
# print('in SS constructor')
assert A.shape[0] == A.shape[1], "A must be square"
n = A.shape[0]
if len(B.shape) == 1:
nin = 1
B = B.reshape((n, 1))
else:
nin = B.shape[1]
assert B.shape[0] == n, "B must have same number of rows as A"
if len(C.shape) == 1:
nout = 1
assert C.shape[0] == n, "C must have same number of columns as A"
C = C.reshape((1, n))
else:
nout = C.shape[0]
assert C.shape[1] == n, "C must have same number of columns as A"
super().__init__(**blockargs)
self.A = A
self.B = B
self.C = C
self.nstates = A.shape[0]
if x0 is None:
self._x0 = np.zeros((self.nstates,))
else:
self._x0 = x0
def output(self, t, u, x):
return list(self.C @ x)
def deriv(self, t, u, x):
# Reshape u and x to (N,1), i.e. column vectors, so
# no problems with broadcasting between A@x and B@u
x = x.reshape(-1, 1)
u = np.array(u).reshape(-1, 1)
xd = self.A @ x + self.B @ u
return xd.flatten()
# ------------------------------------------------------------------------ #
[docs]
class LTI_SISO(LTI_SS):
r"""
:blockname:`LTI_SISO`
Continuous-time SISO LTI dynamics.
:inputs: 1
:outputs: 1
:states: N
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- float
- :math:`u`
* - Output
- 0
- float
- :math:`y`
Implements the dynamics of a single-input single-output (SISO) linear
time invariant (LTI) system described by numerator and denominator
polynomial coefficients. The dynamics are given by
.. math::
\frac{Y(s)}{U(s)} = \frac{N(s)}{D(s)}
Coefficients are given in the order from highest order to zeroth
order, ie. :math:`2s^2 - 4s +3` is ``[2, -4, 3]``.
Only proper transfer functions, where order of numerator is less
than denominator are allowed.
The order of the states in ``x0`` is consistent with controller canonical
form.
Examples::
lti = bd.LTI_SISO(N=[1, 2], D=[2, 3, -4])
is the transfer function :math:`\frac{s+2}{2s^2+3s-4}`.
"""
nin = 1
nout = 1
[docs]
def __init__(self, N=1, D=[1, 1], x0=None, **blockargs):
r"""
:param N: numerator coefficients, defaults to 1
:type N: array_like, optional
:param D: denominator coefficients, defaults to [1,1]
:type D: array_like, optional
:param x0: initial states, defaults to None
:type x0: array_like, optional
:param blockargs: |BlockOptions|
:type blockargs: dict
:return: LTI_SISO block
:rtype: ``LTI_SISO`` instance
"""
# print('in SISO constscutor')
if not isinstance(N, list):
N = [N]
if not isinstance(D, list):
D = [D]
self.N = N
self.D = N
n = len(D) - 1
nn = len(N)
if x0 is None:
x0 = np.zeros((n,))
assert nn <= n, "direct pass through is not supported"
# convert to numpy arrays
# N = np.r_[np.zeros((len(D) - len(N),)), np.array(N)]
N = np.array(N)
D = np.array(D)
# normalize the coefficients to obtain
#
# b_0 s^n + b_1 s^(n-1) + ... + b_n
# ---------------------------------
# a_0 s^n + a_1 s^(n-1) + ....+ a_n
# normalize so leading coefficient of denominator is one
# D0 = D[0]
# D = D / D0
# N = N / D0
# A = np.eye(len(D) - 1, k=1) # control canonic (companion matrix) form
# A[-1, :] = -D[1:]
# B = np.zeros((n, 1))
# B[-1] = 1
# C = (N[1:] - N[0] * D[1:]).reshape((1, n))
A, B, C, D = scipy.signal.tf2ss(N, D)
self.num = N
self.den = D
if len(np.flatnonzero(D)) > 0:
raise ValueError("D matrix is not zero")
super().__init__(A=A, B=B, C=C, x0=x0, **blockargs)
if self.verbose:
print("A=", A)
print("B=", B)
print("C=", C)
def change_param(self, param, newvalue):
if param == "num":
self.num = newvalue
elif param == "den":
self.den = newvalue
self.A, self.B, self.C, self.D = scipy.signal.tf2ss(self.num, self.den)
self.add_param("num", change_param)
self.add_param("den", change_param)
# ------------------------------------------------------------------------ #
from bdsim.blocks.connections import SubSystem
[docs]
class Deriv(SubsystemBlock):
r"""
:blockname:`DERIV`
Continuous-time derivative.
:inputs: 1
:outputs: 1
:states: N
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- float
- :math:`x`
* - Output
- 0
- float
- :math:`y`
Implements the dynamics of a derivative filter, but to be causal it has a single
pole given by ``alpha``. The dynamics is
.. math:: \frac{s}{\frac{s}{\alpha} + 1}
It is implemented as a subsystem with an integrator and a feedback loop. The
initial state of the integrator is given by ``x0``.
If the initial output of the derivative block is known it can be provided as ``y0``
which is related to ``x0`` by :math:`x_0 = - \alpha y_0`.
:seealso: :class:`Integrator`
"""
nin = 1
nout = 1
[docs]
def __init__(self, alpha, x0=0, y0=None, **blockargs):
r"""
:param alpha: filter pole in units of rad/s
:type alpha: float
:param x0: initial states, defaults to 0
:type x0: array_like, optional
:param y0: inital outputs
:type y0: array_like
:param blockargs: |BlockOptions|
:type blockargs: dict
"""
super().__init__(**blockargs)
self.type = "subsystem"
bd = self.bd.runtime.blockdiagram()
if y0 is not None:
x0 = -y0 * alpha
integrator = bd.INTEGRATOR(x0=x0)
inp = bd.INPORT(1)
outp = bd.OUTPORT(1)
sum = bd.SUM("+-")
gain = bd.GAIN(1.0 / alpha)
bd.connect(inp, sum[0])
bd.connect(sum[0], gain)
bd.connect(gain, outp, integrator)
bd.connect(integrator, sum[1])
# get references to the input and output port blocks
self.inport = inp
self.outport = outp
self.subsystem = bd
self.ssname = "derivative"
# ------------------------------------------------------------------------ #
[docs]
class PID(SubsystemBlock):
r"""
:blockname:`PID`
Continuous-time PID control.
:inputs: 2
:outputs: 1
:states: 2
.. list-table::
:header-rows: 1
* - Port type
- Port number
- Types
- Description
* - Input
- 0
- float
- :math:`x`, plant output
* - Input
- 1
- float
- :math:`x^*`, demanded output
* - Output
- 0
- any
- :math:`u`, control to plant
Implements the dynamics of a PID controller:
.. math::
e &= x^* - x
u &= Pe + D \frac{d}{dt} e + I \int e dt
If the I or D terms are not required the ``type`` can be specified as ``"PD"`` or
``"PI"`` in which case the respective gain terms ``I`` and ``D`` will be ignored.
To reduce noise the derivative is computed by the DERIV block which implements a
first-order system
.. math::
\frac{s}{s/a + 1}
where the pole :math:`a=` ``D_filt`` can be positioned appropriately.
If ``I_limit`` is provided it specifies the limits of the integrator
state, before multiplication by ``I``. If ``I_limit`` is:
* a scalar :math:`a` the integrator state is clipped to the interval :math:`[-a, a]`
* a 2-tuple :math:`(a,b)` the integrator state is clipped to the interval :math:`[a, b]`
If ``I_band`` is provided the integrator is reset to zero whenever the
error :math:`e` is outside the band given by ``I_band`` which is:
* a scalar :math:`s` the band is the interval :math:`[-s, s]`
* a 2-tuple :math:`(a,b)` the band is the interval :math:`[a, b]`
Examples::
pid = bd.PID(P=3, D=2, I=1)
..note:: The result is a subsystem which will be expanded into 12 blocks for the
PID case, fewer for the PI or PD cases.
:seealso: :class:`Deriv`
"""
nin = 2
nout = 1
[docs]
def __init__(
self,
type: str = "PID",
P: float = 0.0,
D: float = 0.0,
I: float = 0.0,
D_pole=1,
I_limit=None,
I_band=None,
**blockargs,
):
r"""
:param type: the controller type, defaults to "PID"
:type type: str, optional
:param P: proportional gain, defaults to 0
:type P: float
:param D: derivative gain, defaults to 0
:type D: float
:param I: integral gain, defaults to 0
:type I: float
:param D_pole: filter pole for derivative estimate, defaults to 1 rad/s
:type D_pole: float
:param I_limit: integral limit
:type I_limit: float or 2-tuple
:param I_band: band within which integral action is active
:type I_band: float
:param blockargs: |BlockOptions|
:type blockargs: dict
"""
super().__init__(**blockargs)
self.type = "subsystem"
subsystem = self.bd.runtime.blockdiagram()
bd = blockargs["bd"]
blockargs.pop("bd")
Pblock = subsystem.GAIN(P) # proportional gain block
if "I" in type:
# if the I term is required, create the block
if I_limit is None:
min = -np.inf
max = np.inf
elif isinstance(I_limit, float):
min = -I_limit
max = I_limit
elif isinstance(I_limit, tuple) and len(I_limit) == 2:
min, max = I_limit
if I_band is not None:
def ifunc(t, u, x):
return abs(u[0]) < I_band
else:
ifunc = None
# integrator block
Iblock = subsystem.INTEGRATOR(min=min, max=max, gain=I, enable=ifunc)
if "D" in type:
# if the D term is required, create the blocks
Dblock = subsystem.DERIV(alpha=D_pole) # derivative block
Dgain = subsystem.GAIN(D) # derivative gain
subsystem.connect(Dblock, Dgain)
error_sum = subsystem.SUM("-+", name="errsum") # error summing junction
inp = subsystem.INPORT(2) # PID block inputs
outp = subsystem.OUTPORT(1) # PID block output
# for each case sum the various terms
if type == "PID":
out_sum = subsystem.SUM("+++", name="outsum")
subsystem.connect(error_sum, Pblock, Dblock, Iblock)
subsystem.connect(Pblock, out_sum[0])
subsystem.connect(Iblock, out_sum[1])
subsystem.connect(Dgain, out_sum[2])
elif type == "PI":
out_sum = subsystem.SUM("++", name="outsum")
subsystem.connect(error_sum, Pblock, Iblock)
subsystem.connect(Pblock, out_sum[0])
subsystem.connect(Iblock, out_sum[1])
elif type == "PD":
out_sum = subsystem.SUM("++", name="outsum")
subsystem.connect(error_sum, Pblock, Dblock)
subsystem.connect(Pblock, out_sum[0])
subsystem.connect(Dgain, out_sum[1])
subsystem.connect(inp, error_sum)
subsystem.connect(out_sum, outp)
subsystem.report()
# super().__init__(subsystem, name=name, bd=bd)
# get references to the input and output port blocks
self.inport = inp
self.outport = outp
self.subsystem = subsystem
self.ssname = "PID"
if __name__ == "__main__":
from bdsim import BDSim
sim = BDSim(hold=False)
bd = sim.blockdiagram()
deriv = bd.DERIV(alpha=0.1, verbose=True)
c = bd.WAVEFORM(wave="sine", freq=1)
s = bd.SCOPE(2)
bd.connect(c, deriv, s[0])
bd.connect(deriv, s[1])
bd.compile()
bd.report_summary()
out = sim.run(bd, 10, dt=0.02)
import matplotlib.pyplot as plt
plt.plot(out.t, out.x)
sim.done(bd, block=True)
# sim = BDSim()
# bd = sim.blockdiagram()
# pid = bd.PID(P=2, D=0.01, verbose=True)
# c = bd.CONSTANT(1)
# s = bd.SCOPE()
# bd.connect(c, pid)
# bd.connect(pid, s)
# bd.compile(report=True)
# bd.report()
# from pathlib import Path
# exec(
# open(Path(__file__).parent.parent.parent / "tests" / "test_transfers.py").read()
# )