"""
Python EKF Planner
@Author: Peter Corke, original MATLAB code and Python version
@Author: Kristian Gibson, initial MATLAB port
"""
from collections import namedtuple
import numpy as np
from math import pi
from scipy import integrate
from scipy.linalg import sqrtm, block_diag
from scipy.stats.distributions import chi2
import matplotlib.pyplot as plt
from matplotlib import animation
from spatialmath.base.animate import Animate
from spatialmath import base, SE2
from roboticstoolbox.mobile import VehicleBase
from roboticstoolbox.mobile.landmarkmap import LandmarkMap
from roboticstoolbox.mobile.sensors import SensorBase
[docs]class EKF:
[docs] def __init__(
self,
robot,
sensor=None,
map=None,
P0=None,
x_est=None,
joseph=True,
animate=True,
x0=[0, 0, 0],
verbose=False,
history=True,
workspace=None,
):
r"""
Extended Kalman filter
:param robot: robot motion model
:type robot: 2-tuple
:param sensor: vehicle mounted sensor model, defaults to None
:type sensor: 2-tuple, optional
:param map: landmark map, defaults to None
:type map: :class:`LandmarkMap`, optional
:param P0: initial covariance matrix, defaults to None
:type P0: ndarray(n,n), optional
:param x_est: initial state estimate, defaults to None
:type x_est: array_like(n), optional
:param joseph: use Joseph update of covariance, defaults to True
:type joseph: bool, optional
:param animate: show animation of vehicle motion, defaults to True
:type animate: bool, optional
:param x0: initial EKF state, defaults to [0, 0, 0]
:type x0: array_like(n), optional
:param verbose: display extra debug information, defaults to False
:type verbose: bool, optional
:param history: retain step-by-step history, defaults to True
:type history: bool, optional
:param workspace: dimension of workspace, see :func:`~spatialmath.base.graphics.expand_dims`
:type workspace: scalar, array_like(2), array_like(4)
This class solves several classical robotic estimation problems, which are
selected according to the arguments:
====================== ====== ========== ========== ======= ======
Problem len(x) ``robot`` ``sensor`` ``map`` ``P0``
====================== ====== ========== ========== ======= ======
Dead reckoning 3 (veh,V) None None P0
Map-based localization 3 (veh,V) (smodel,W) yes P0
Map creation 2N (veh,None) (smodel,W) None None
SLAM 3+2N (veh,V) (smodel,W) None P0
====================== ====== ========== ========== ======= ======
where:
- ``veh`` models the robotic vehicle kinematics and odometry and is a :class:`VehicleBase` subclass
- ``V`` is the estimated odometry (process) noise covariance as an ndarray(3,3)
- ``smodel`` models the robot mounted sensor and is a :class:`SensorBase` subclass
- ``W`` is the estimated sensor (measurement) noise covariance as an ndarray(2,2)
The state vector has different lengths depending on the particular
estimation problem, see below.
At each iteration of the EKF:
- invoke the step method of the ``robot``
- obtains the next control input from the driver agent, and apply it
as the vehicle control input
- the vehicle returns a noisy odometry estimate
- the state prediction is computed
- the true pose is used to determine a noisy sensor observation
- the state is corrected, new landmarks are added to the map
The working area of the robot is defined by ``workspace`` or inherited
from the landmark map attached to the ``sensor`` (see
:func:`~spatialmath.base.graphics.expand_dims`):
============== ======= =======
``workspace`` x-range y-range
============== ======= =======
A (scalar) -A:A -A:A
[A, B] A:B A:B
[A, B, C, D] A:B C:D
============== ======= =======
**Dead-reckoning localization**
The state :math:`\vec{x} = (x, y, \theta)` is the estimated vehicle
configuration.
Create a vehicle with odometry covariance ``V``, add a driver to it,
run the Kalman filter with estimated covariances ``V`` and initial
vehicle state covariance ``P0``::
V = np.diag([0.02, np.radians(0.5)]) ** 2;
robot = Bicycle(covar=V)
robot.control = RandomPath(workspace=10)
x_sdev = [0.05, 0.05, np.radians(0.5)]
P0 = np.diag(x_sdev) ** 2
ekf = EKF(robot=(robot, V), P0=P0)
ekf.run(T=20) # run the simulation for 20 seconds
robot.plot_xy(color="b") # plot the true vehicle path
ekf.plot_xy(color="r") # overlay the estimated path
ekf.plot_ellipse(filled=True, facecolor="g", alpha=0.3) # overlay uncertainty ellipses
# plot the covariance against time
t = ekf.get_t();
pn = ekf.get_Pnorm()
plt.plot(t, pn);
**Map-based vehicle localization**
The state :math:`\vec{x} = (x, y, \theta)` is the estimated vehicle
configuration.
Create a vehicle with odometry covariance ``V``, add a driver to it,
create a map with 20 point landmarks, create a sensor that uses the map
and vehicle state to estimate landmark range and bearing with covariance
``W``, the Kalman filter with estimated covariances ``V`` and ``W`` and
initial vehicle state covariance ``P0``::
V = np.diag([0.02, np.radians(0.5)]) ** 2;
robot = Bicycle(covar=V)
robot.control = RandomPath(workspace=10)
map = LandmarkMap(nlandmarks=20, workspace=10)
W = np.diag([0.1, np.radians(1)]) ** 2
sensor = RangeBearingSensor(robot=robot, map=map, covar=W, angle=[-np.pi/2, np.pi/2], range=4, animate=True)
x_sdev = [0.05, 0.05, np.radians(0.5)]
P0 = np.diag(x_sdev) ** 2
ekf = EKF(robot=(robot, V), P0=P0, map=map, sensor=(sensor, W))
ekf.run(T=20) # run the simulation for 20 seconds
map.plot() # plot the map
robot.plot_xy(color="b") # plot the true vehicle path
ekf.plot_xy(color="r") # overlay the estimated path
ekf.plot_ellipse() # overlay uncertainty ellipses
# plot the covariance against time
t = ekf.get_t();
pn = ekf.get_Pnorm()
plt.plot(t, pn);
**Vehicle-based map making**
The state :math:`\vec{x} = (x_0, y_0, \dots, x_{N-1}, y_{N-1})` is the
estimated landmark positions where :math:`N` is the number of landmarks.
The state vector is initially empty, and is extended by 2 elements every
time a new landmark is observed.
Create a vehicle with perfect odometry (no covariance), add a driver to it,
create a sensor that uses the map and vehicle state to estimate landmark range
and bearing with covariance ``W``, the Kalman filter with estimated sensor
covariance ``W``, then run the filter for N time steps::
robot = Bicycle()
robot.add_driver(RandomPath(20, 2))
map = LandmarkMap(nlandmarks=20, workspace=10, seed=0)
W = np.diag([0.1, np.radians(1)]) ** 2
sensor = RangeBearingSensor(robot, map, W)
ekf = EKF(robot=(robot, None), sensor=(sensor, W))
ekf.run(T=20) # run the simulation for 20 seconds
map.plot() # plot the map
robot.plot_xy(color="b") # plot the true vehicle path
**Simultaneous localization and mapping (SLAM)**
The state :math:`\vec{x} = (x, y, \theta, x_0, y_0, \dots, x_{N-1},
y_{N-1})` is the estimated vehicle configuration followed by the
estimated landmark positions where :math:`N` is the number of landmarks.
The state vector is initially of length 3, and is extended by 2 elements
every time a new landmark is observed.
Create a vehicle with odometry covariance ``V``, add a driver to it,
create a map with 20 point landmarks, create a sensor that uses the map
and vehicle state to estimate landmark range and bearing with covariance
``W``, the Kalman filter with estimated covariances ``V`` and ``W`` and
initial state covariance ``P0``, then run the filter to estimate the
vehicle state at each time step and the map::
V = np.diag([0.02, np.radians(0.5)]) ** 2;
robot = Bicycle(covar=V)
robot.control = RandomPath(workspace=10)
map = LandmarkMap(nlandmarks=20, workspace=10)
W = np.diag([0.1, np.radians(1)]) ** 2
sensor = RangeBearingSensor(robot=robot, map=map, covar=W, angle=[-np.pi/2, np.pi/2], range=4, animate=True)
ekf = EKF(robot=(robot, V), P0=P0, sensor=(sensor, W))
ekf.run(T=20) # run the simulation for 20 seconds
map.plot(); # plot true map
ekf.plot_map(); # plot estimated landmark position
robot.plot_xy(); # plot true path
ekf.plot_xy(); # plot estimated robot path
ekf.plot_ellipse(); # plot estimated covariance
# plot the covariance against time
t = ekf.get_t();
pn = ekf.get_Pnorm()
plt.plot(t, pn);
:seealso: :meth:`run`
"""
if robot is not None:
if (
not isinstance(robot, tuple)
or len(robot) != 2
or not isinstance(robot[0], VehicleBase)
):
raise TypeError("robot must be tuple (vehicle, V_est)")
self._robot = robot[0] # reference to the robot vehicle
self._V_est = robot[1] # estimate of vehicle state covariance V
if sensor is not None:
if (
not isinstance(sensor, tuple)
or len(sensor) != 2
or not isinstance(sensor[0], SensorBase)
):
raise TypeError("sensor must be tuple (sensor, W_est)")
self._sensor = sensor[0] # reference to the sensor
self._W_est = sensor[1] # estimate of sensor covariance W
else:
self._sensor = None
self._W_est = None
if map is not None and not isinstance(map, LandmarkMap):
raise TypeError("map must be LandmarkMap instance")
self._ekf_map = map # prior map for localization
if animate:
if map is not None:
self._workspace = map.workspace
self._robot._workspace = map.workspace
elif sensor is not None:
self._workspace = sensor[0].map.workspace
self._robot._workspace = sensor[0].map.workspace
elif self.robot.workspace is None:
raise ValueError("for animation robot must have a defined workspace")
self.animate = animate
self._P0 = P0 # initial system covariance
self._x0 = x0 # initial vehicle state
self._x_est = x_est # estimated state
self._landmarks = None # ekf_map state
self._est_vehicle = False
self._est_ekf_map = False
if self._V_est is not None:
# estimating vehicle pose by:
# - DR if sensor is None
# - localization if sensor is not None and map is not None
self._est_vehicle = True
# perfect vehicle case
if map is None and sensor is not None:
# estimating ekf_map
self._est_ekf_map = True
self._joseph = joseph # flag: use Joseph form to compute p
self._verbose = verbose
self._keep_history = history # keep history
self._htuple = namedtuple("EKFlog", "t xest odo P innov S K lm z")
if workspace is not None:
self._dim = base.expand_dims(dim)
elif self.sensor is not None:
self._dim = self.sensor.map.workspace
else:
self._dim = self._robot.workspace
# self.robot.init()
# # clear the history
# self._history = []
if self.V_est is None:
# perfect vehicle case
self._est_vehicle = False
self._x_est = None
self._P_est = None
else:
# noisy odometry case
if self.V_est.shape != (2, 2):
raise ValueError("vehicle state covariance V_est must be 2x2")
self._x_est = self.robot.x
self._P_est = P0
self._est_vehicle = True
if self.W_est is not None:
if self.W_est.shape != (2, 2):
raise ValueError("sensor covariance W_est must be 2x2")
# if np.any(self._sensor):
# self._landmarks = None*np.zeros(2, self._sensor.ekf_map.nlandmarks)
# # check types for passed objects
# if np.any(self._map) and not isinstance(self._map, 'LandmarkMap'):
# raise ValueError('expecting LandmarkMap object')
# if np.any(sensor) and not isinstance(sensor, 'Sensor'):
# raise ValueError('expecting Sensor object')
self.init()
self.xxdata = ([], [])
def __str__(self):
s = f"EKF object: {len(self._x_est)} states"
def indent(s, n=2):
spaces = " " * n
return s.replace("\n", "\n" + spaces)
estimating = []
if self._est_vehicle is not None:
estimating.append("vehicle pose")
if self._est_ekf_map is not None:
estimating.append("map")
if len(estimating) > 0:
s += ", estimating: " + ", ".join(estimating)
if self.robot is not None:
s += indent("\nrobot: " + str(self.robot))
if self.V_est is not None:
s += indent("\nV_est: " + base.array2str(self.V_est))
if self.sensor is not None:
s += indent("\nsensor: " + str(self.sensor))
if self.W_est is not None:
s += indent("\nW_est: " + base.array2str(self.W_est))
return s
def __repr__(self):
return str(self)
@property
def x_est(self):
"""
Get EKF state
:return: state vector
:rtype: ndarray(n)
Returns the value of the estimated state vector at the end of
simulation. The dimensions depend on the problem being solved.
"""
return self._x_est
@property
def P_est(self):
"""
Get EKF covariance
:return: covariance matrix
:rtype: ndarray(n,n)
Returns the value of the estimated covariance matrix at the end of
simulation. The dimensions depend on the problem being solved.
"""
return self._P_est
@property
def P0(self):
"""
Get initial EKF covariance
:return: covariance matrix
:rtype: ndarray(n,n)
Returns the value of the covariance matrix passed to the constructor.
"""
return self._P0
@property
def V_est(self):
"""
Get estimated odometry covariance
:return: odometry covariance
:rtype: ndarray(2,2)
Returns the value of the estimated odometry covariance matrix passed to
the constructor
"""
return self._V_est
@property
def W_est(self):
"""
Get estimated sensor covariance
:return: sensor covariance
:rtype: ndarray(2,2)
Returns the value of the estimated sensor covariance matrix passed to
the constructor
"""
return self._W_est
@property
def robot(self):
"""
Get robot object
:return: robot used in simulation
:rtype: :class:`VehicleBase` subclass
"""
return self._robot
@property
def sensor(self):
"""
Get sensor object
:return: sensor used in simulation
:rtype: :class:`SensorBase` subclass
"""
return self._sensor
@property
def map(self):
"""
Get map object
:return: map used in simulation
:rtype: :class:`LandmarkMap` subclass
"""
return self._map
@property
def verbose(self):
"""
Get verbosity state
:return: verbosity
:rtype: bool
"""
return self._verbose
@property
def history(self):
"""
Get EKF simulation history
:return: simulation history
:rtype: list of namedtuples
At each simulation timestep a namedtuple of is appended to the history
list. It contains, for that time step, estimated state and covariance,
and sensor observation.
:seealso: :meth:`get_t` :meth:`get_xyt` :meth:`get_map` :meth:`get_P`
:meth:`get_Pnorm`
"""
return self._history
@property
def workspace(self):
"""
Size of robot workspace
:return: workspace bounds [xmin, xmax, ymin, ymax]
:rtype: ndarray(4)
Returns the bounds of the workspace as specified by the constructor
option ``workspace``
"""
return self._workspace
@property
def landmarks(self):
"""
Get landmark information
:return: landmark information
:rtype: dict
The dictionary is indexed by the landmark id and gives a 3-tuple:
- order in which landmark was seen
- number of times seen
The order in which the landmark was first seen. The first observed
landmark has order 0 and so on.
:seealso: :meth:`landmark`
"""
return self._landmarks
[docs] def landmark(self, id):
"""
Landmark information
:param id: landmark index
:type id: int
:return: order in which it was first seen, number of times seen
:rtype: int, int
The first observed landmark has order 0 and so on.
:seealso: :meth:`landmarks` :meth:`landmark_index` :meth:`landmark_mindex`
"""
try:
l = self._landmarks[id]
return l[0], l[1]
except KeyError:
raise ValueError(f"unknown landmark {id}") from None
[docs] def landmark_index(self, id):
"""
Landmark index in complete state vector
:param id: landmark index
:type id: int
:return: index in the state vector
:rtype: int
The return value ``j`` is the index of the x-coordinate of the landmark
in the EKF state vector, and ``j+1`` is the index of the y-coordinate.
:seealso: :meth:`landmark`
"""
try:
jx = self._landmarks[id][0] * 2
if self._est_vehicle:
jx += 3
return jx
except KeyError:
raise ValueError(f"unknown landmark {id}") from None
[docs] def landmark_mindex(self, id):
"""
Landmark index in map state vector
:param id: landmark index
:type id: int
:return: index in the state vector
:rtype: int
The return value ``j`` is the index of the x-coordinate of the landmark
in the map vector, and ``j+1`` is the index of the y-coordinate.
:seealso: :meth:`landmark`
"""
try:
return self._landmarks[id][0] * 2
except KeyError:
raise ValueError(f"unknown landmark {id}") from None
[docs] def landmark_x(self, id):
"""
Landmark position
:param id: landmark index
:type id: int
:return: landmark position :math:`(x,y)`
:rtype: ndarray(2)
Returns the landmark position from the current state vector.
"""
jx = self.landmark_index(id)
return self._x_est[jx : jx + 2]
[docs] def init(self):
# EKF.init Reset the filter
#
# E.init() resets the filter state and clears landmarks and history.
self.robot.init()
if self.sensor is not None:
self.sensor.init()
# clear the history
self._history = []
if self._V_est is None:
# perfect vehicle case
self._estVehicle = False
self._x_est = np.empty((0,))
self._P_est = np.empty((0, 0))
else:
# noisy odometry case
self._x_est = self._x0
self._P_est = self._P0
self._estVehicle = True
if self.sensor is not None:
# landmark dictionary maps lm_id to list[index, nseen]
self._landmarks = {}
# np.full((2, len(self.sensor.map)), -1, dtype=int)
[docs] def run(self, T, animate=False):
"""
Run the EKF simulation
:param T: maximum simulation time in seconds
:type T: float
:param animate: animate motion of vehicle, defaults to False
:type animate: bool, optional
Simulates the motion of a vehicle (under the control of a driving agent)
and the EKF estimator. The steps are:
- initialize the filter, vehicle and vehicle driver agent, sensor
- for each time step:
- step the vehicle and its driver agent, obtain odometry
- take a sensor reading
- execute the EKF
- save information as a namedtuple to the history list for later display
:seealso: :meth:`history` :meth:`landmark` :meth:`landmarks`
:meth:`get_xyt` :meth:`get_t` :meth:`get_map` :meth:`get_P` :meth:`get_Pnorm`
:meth:`plot_xy` :meth:`plot_ellipse` :meth:`plot_error` :meth:`plot_map`
:meth:`run_animation`
"""
self.init()
if animate:
if self.sensor is not None:
self.sensor.map.plot()
plt.xlabel("X")
plt.ylabel("Y")
for k in range(round(T / self.robot.dt)):
if animate:
# self.robot.plot()
self.robot._animation.update(self.robot.x)
self.step()
[docs] def run_animation(self, T=10, x0=None, control=None, format=None, file=None):
r"""
Run the EKF simulation
:param T: maximum simulation time in seconds
:type T: float
:param format: Output format
:type format: str, optional
:param file: File name
:type file: str, optional
:return: Matplotlib animation object
:rtype: :meth:`matplotlib.animation.FuncAnimation`
Simulates the motion of a vehicle (under the control of a driving agent)
and the EKF estimator for ``T`` seconds and returns an animation
in various formats::
``format`` ``file`` description
============ ========= ============================
``"html"`` str, None return HTML5 video
``"jshtml"`` str, None return JS+HTML video
``"gif"`` str return animated GIF
``"mp4"`` str return MP4/H264 video
``None`` return a ``FuncAnimation`` object
If ``file`` can be ``None`` then return the video as a string, otherwise it
must be a filename.
The simulation steps are:
- initialize the filter, vehicle and vehicle driver agent, sensor
- for each time step:
- step the vehicle and its driver agent, obtain odometry
- take a sensor reading
- execute the EKF
- save information as a namedtuple to the history list for later display
:seealso: :meth:`history` :meth:`landmark` :meth:`landmarks`
:meth:`get_xyt` :meth:`get_t` :meth:`get_map` :meth:`get_P` :meth:`get_Pnorm`
:meth:`plot_xy` :meth:`plot_ellipse` :meth:`plot_error` :meth:`plot_map`
:meth:`run_animation`
"""
fig, ax = plt.subplots()
def init():
self.init()
if self.sensor is not None:
self.sensor.map.plot()
ax.set_xlabel("X")
ax.set_ylabel("Y")
def animate(i):
self.robot._animation.update(self.robot.x)
self.step(pause=False)
nframes = round(T / self.robot._dt)
anim = animation.FuncAnimation(
fig=fig,
func=animate,
init_func=init,
frames=nframes,
interval=self.robot.dt * 1000,
blit=False,
repeat=False,
)
ret = None
if format == "html":
ret = anim.to_html5_video() # convert to embeddable HTML5 animation
elif format == "jshtml":
ret = anim.to_jshtml() # convert to embeddable Javascript/HTML animation
elif format == "gif":
anim.save(
file, writer=animation.PillowWriter(fps=1 / self.robot.dt)
) # convert to GIF
ret = None
elif format == "mp4":
anim.save(
file, writer=animation.FFMpegWriter(fps=1 / self.robot.dt)
) # convert to mp4/H264
ret = None
elif format == None:
# return the anim object
return anim
else:
raise ValueError("unknown format")
if ret is not None and file is not None:
with open(file, "w") as f:
f.write(ret)
ret = None
plt.close(fig)
return ret
[docs] def step(self, pause=None):
"""
Execute one timestep of the simulation
"""
# move the robot
odo = self.robot.step(pause=pause)
# =================================================================
# P R E D I C T I O N
# =================================================================
if self._est_vehicle:
# split the state vector and covariance into chunks for
# vehicle and map
xv_est = self._x_est[:3]
xm_est = self._x_est[3:]
Pvv_est = self._P_est[:3, :3]
Pmm_est = self._P_est[3:, 3:]
Pvm_est = self._P_est[:3, 3:]
else:
xm_est = self._x_est
Pmm_est = self._P_est
if self._est_vehicle:
# evaluate the state update function and the Jacobians
# if vehicle has uncertainty, predict its covariance
xv_pred = self.robot.f(xv_est, odo)
Fx = self.robot.Fx(xv_est, odo)
Fv = self.robot.Fv(xv_est, odo)
Pvv_pred = Fx @ Pvv_est @ Fx.T + Fv @ self.V_est @ Fv.T
else:
# otherwise we just take the true robot state
xv_pred = self._robot.x
if self._est_ekf_map:
if self._est_vehicle:
# SLAM case, compute the correlations
Pvm_pred = Fx @ Pvm_est
Pmm_pred = Pmm_est
xm_pred = xm_est
# put the chunks back together again
if self._est_vehicle and not self._est_ekf_map:
# vehicle only
x_pred = xv_pred
P_pred = Pvv_pred
elif not self._est_vehicle and self._est_ekf_map:
# map only
x_pred = xm_pred
P_pred = Pmm_pred
elif self._est_vehicle and self._est_ekf_map:
# vehicle and map
x_pred = np.r_[xv_pred, xm_pred]
# fmt: off
P_pred = np.block([
[Pvv_pred, Pvm_pred],
[Pvm_pred.T, Pmm_pred]
])
# fmt: on
# at this point we have:
# xv_pred the state of the vehicle to use to
# predict observations
# xm_pred the state of the map
# x_pred the full predicted state vector
# P_pred the full predicted covariance matrix
# initialize the variables that might be computed during
# the update phase
doUpdatePhase = False
# disp('x_pred:') x_pred'
# =================================================================
# P R O C E S S O B S E R V A T I O N S
# =================================================================
if self.sensor is not None:
# read the sensor
z, lm_id = self.sensor.reading()
sensorReading = z is not None
else:
lm_id = None # keep history saving happy
z = None
sensorReading = False
if sensorReading:
# here for MBL, MM, SLAM
# compute the innovation
z_pred = self.sensor.h(xv_pred, lm_id)
innov = np.array([z[0] - z_pred[0], base.wrap_mpi_pi(z[1] - z_pred[1])])
if self._est_ekf_map:
# the ekf_map is estimated MM or SLAM case
if self._isseenbefore(lm_id):
# landmark is previously seen
# get previous estimate of its state
jx = self.landmark_mindex(lm_id)
xf = xm_pred[jx : jx + 2]
# compute Jacobian for this particular landmark
# xf = self.sensor.g(xv_pred, z) # HACK
Hx_k = self.sensor.Hp(xv_pred, xf)
z_pred = self.sensor.h(xv_pred, xf)
innov = np.array(
[z[0] - z_pred[0], base.wrap_mpi_pi(z[1] - z_pred[1])]
)
# create the Jacobian for all landmarks
Hx = np.zeros((2, len(xm_pred)))
Hx[:, jx : jx + 2] = Hx_k
Hw = self.sensor.Hw(xv_pred, xf)
if self._est_vehicle:
# concatenate Hx for for vehicle and ekf_map
Hxv = self.sensor.Hx(xv_pred, xf)
Hx = np.block([Hxv, Hx])
self._landmark_increment(lm_id) # update the count
if self._verbose:
print(
f"landmark {lm_id} seen"
f" {self._landmark_count(lm_id)} times,"
f" state_idx={self.landmark_index(lm_id)}"
)
doUpdatePhase = True
else:
# new landmark, seen for the first time
# extend the state vector and covariance
x_pred, P_pred = self._extend_map(
P_pred, xv_pred, xm_pred, z, lm_id
)
# if lm_id == 17:
# print(P_pred)
# # print(x_pred[-2:], self._sensor._map.landmark(17), base.norm(x_pred[-2:] - self._sensor._map.landmark(17)))
self._landmark_add(lm_id)
if self._verbose:
print(
f"landmark {lm_id} seen for first time,"
f" state_idx={self.landmark_index(lm_id)}"
)
doUpdatePhase = False
else:
# LBL
Hx = self.sensor.Hx(xv_pred, lm_id)
Hw = self.sensor.Hw(xv_pred, lm_id)
doUpdatePhase = True
else:
innov = None
# doUpdatePhase flag indicates whether or not to do
# the update phase of the filter
#
# DR always false
# map-based localization if sensor reading
# map creation if sensor reading & not first
# sighting
# SLAM if sighting of a previously
# seen landmark
if doUpdatePhase:
# disp('do update\n')
# # we have innovation, update state and covariance
# compute x_est and P_est
# compute innovation covariance
S = Hx @ P_pred @ Hx.T + Hw @ self._W_est @ Hw.T
# compute the Kalman gain
K = P_pred @ Hx.T @ np.linalg.inv(S)
# update the state vector
x_est = x_pred + K @ innov
if self._est_vehicle:
# wrap heading state for a vehicle
x_est[2] = base.wrap_mpi_pi(x_est[2])
# update the covariance
if self._joseph:
# we use the Joseph form
I = np.eye(P_pred.shape[0])
P_est = (I - K @ Hx) @ P_pred @ (I - K @ Hx).T + K @ self._W_est @ K.T
else:
P_est = P_pred - K @ S @ K.T
# enforce P to be symmetric
P_est = 0.5 * (P_est + P_est.T)
else:
# no update phase, estimate is same as prediction
x_est = x_pred
P_est = P_pred
S = None
K = None
self._x_est = x_est
self._P_est = P_est
if self._keep_history:
hist = self._htuple(
self.robot._t,
x_est.copy(),
odo.copy(),
P_est.copy(),
innov.copy() if innov is not None else None,
S.copy() if S is not None else None,
K.copy() if K is not None else None,
lm_id if lm_id is not None else -1,
z.copy() if z is not None else None,
)
self._history.append(hist)
## landmark management
def _isseenbefore(self, lm_id):
# _landmarks[0, id] is the order in which seen
# _landmarks[1, id] is the occurence count
return lm_id in self._landmarks
def _landmark_increment(self, lm_id):
self._landmarks[lm_id][1] += 1 # update the count
def _landmark_count(self, lm_id):
return self._landmarks[lm_id][1]
def _landmark_add(self, lm_id):
self._landmarks[lm_id] = [len(self._landmarks), 1]
def _extend_map(self, P, xv, xm, z, lm_id):
# this is a new landmark, we haven't seen it before
# estimate position of landmark in the world based on
# noisy sensor reading and current vehicle pose
# M = None
# estimate its position based on observation and vehicle state
xf = self.sensor.g(xv, z)
# append this estimate to the state vector
if self._est_vehicle:
x_ext = np.r_[xv, xm, xf]
else:
x_ext = np.r_[xm, xf]
# get the Jacobian for the new landmark
Gz = self.sensor.Gz(xv, z)
# extend the covariance matrix
n = len(self._x_est)
if self._est_vehicle:
# estimating vehicle state
Gx = self.sensor.Gx(xv, z)
# fmt: off
Yz = np.block([
[np.eye(n), np.zeros((n, 2)) ],
[Gx, np.zeros((2, n-3)), Gz]
])
# fmt: on
else:
# estimating landmarks only
# P_ext = block_diag(P, Gz @ self._W_est @ Gz.T)
# fmt: off
Yz = np.block([
[np.eye(n), np.zeros((n, 2)) ],
[np.zeros((2, n)), Gz]
])
# fmt: on
P_ext = Yz @ block_diag(P, self._W_est) @ Yz.T
return x_ext, P_ext
[docs] def get_t(self):
"""
Get time from simulation
:return: simulation time vector
:rtype: ndarray(n)
Return simulation time vector, starts at zero. The timestep is an
attribute of the ``robot`` object.
:seealso: :meth:`run` :meth:`history`
"""
return np.array([h.t for h in self._history])
[docs] def get_xyt(self):
r"""
Get estimated vehicle trajectory
:return: vehicle trajectory where each row is configuration :math:`(x, y, \theta)`
:rtype: ndarray(n,3)
:seealso: :meth:`plot_xy` :meth:`run` :meth:`history`
"""
if self._est_vehicle:
xyt = np.array([h.xest[:3] for h in self._history])
else:
xyt = None
return xyt
[docs] def plot_xy(self, *args, block=None, **kwargs):
"""
Plot estimated vehicle position
:param args: position arguments passed to :meth:`~matplotlib.axes.Axes.plot`
:param kwargs: keywords arguments passed to :meth:`~matplotlib.axes.Axes.plot`
:param block: hold plot until figure is closed, defaults to None
:type block: bool, optional
Plot the estimated vehicle path in the xy-plane.
:seealso: :meth:`get_xyt` :meth:`plot_error` :meth:`plot_ellipse` :meth:`plot_P`
:meth:`run` :meth:`history`
"""
if args is None and "color" not in kwargs:
kwargs["color"] = "r"
xyt = self.get_xyt()
plt.plot(xyt[:, 0], xyt[:, 1], *args, **kwargs)
if block is not None:
plt.show(block=block)
[docs] def plot_ellipse(self, confidence=0.95, N=10, block=None, **kwargs):
"""
Plot uncertainty ellipses
:param confidence: ellipse confidence interval, defaults to 0.95
:type confidence: float, optional
:param N: number of ellipses to plot, defaults to 10
:type N: int, optional
:param block: hold plot until figure is closed, defaults to None
:type block: bool, optional
:param kwargs: arguments passed to :meth:`spatialmath.base.graphics.plot_ellipse`
Plot ``N`` uncertainty ellipses spaced evenly along the trajectory.
:seealso: :meth:`get_P` :meth:`run` :meth:`history`
"""
nhist = len(self._history)
if "label" in kwargs:
label = kwargs["label"]
del kwargs["label"]
else:
label = f"{confidence*100:.3g}% confidence"
for k in np.linspace(0, nhist - 1, N):
k = round(k)
h = self._history[k]
if k == 0:
base.plot_ellipse(
h.P[:2, :2],
centre=h.xest[:2],
confidence=confidence,
label=label,
inverted=True,
**kwargs,
)
else:
base.plot_ellipse(
h.P[:2, :2],
centre=h.xest[:2],
confidence=confidence,
inverted=True,
**kwargs,
)
if block is not None:
plt.show(block=block)
[docs] def plot_error(self, bgcolor="r", confidence=0.95, ax=None, block=None, **kwargs):
r"""
Plot error with uncertainty bounds
:param bgcolor: background color, defaults to 'r'
:type bgcolor: str, optional
:param confidence: confidence interval, defaults to 0.95
:type confidence: float, optional
:param block: hold plot until figure is closed, defaults to None
:type block: bool, optional
Plot the error between actual and estimated vehicle
path :math:`(x, y, \theta)`` versus time as three stacked plots.
Heading error is wrapped into the range :math:`[-\pi,\pi)`
Behind each line draw a shaded polygon ``bgcolor`` showing the specified
``confidence`` bounds based on the covariance at each time step.
Ideally the lines should be within the shaded polygon ``confidence``
of the time.
.. note:: Observations will decrease the uncertainty while periods of dead-reckoning increase it.
:seealso: :meth:`get_P` :meth:`run` :meth:`history`
"""
error = []
bounds = []
ppf = chi2.ppf(confidence, df=2)
x_gt = self.robot.x_hist
for k in range(len(self.history)):
hk = self.history[k]
# error is true - estimated
e = x_gt[k, :] - hk.xest
e[2] = base.wrap_mpi_pi(e[2])
error.append(e)
P = np.diag(hk.P)
bounds.append(np.sqrt(ppf * P[:3]))
error = np.array(error)
bounds = np.array(bounds)
t = self.get_t()
if ax is None:
fig, axes = plt.subplots(3)
else:
axes = ax[:3]
labels = ["x", "y", r"$\theta$"]
for k, ax in enumerate(axes):
if confidence is not None:
edge = np.array(
[
np.r_[t, t[::-1]],
np.r_[bounds[:, k], -bounds[::-1, k]],
]
)
polygon = plt.Polygon(
edge.T, closed=True, facecolor="r", edgecolor="none", alpha=0.3
)
ax.add_patch(polygon)
ax.plot(error[:, k], **kwargs)
ax.grid(True)
ax.set_ylabel(labels[k] + " error")
ax.set_xlim(0, t[-1])
if block is not None:
plt.show(block=block)
# subplot(opt.nplots*100+12)
# if opt.confidence
# edge = [pxy(:,2); -pxy(end:-1:1,2)];
# h = patch(t, edge, opt.color);
# set(h, 'EdgeColor', 'none', 'FaceAlpha', 0.3);
# end
# hold on
# plot(err(:,2), args{:});
# hold off
# grid
# ylabel('y error')
# subplot(opt.nplots*100+13)
# if opt.confidence
# edge = [pxy(:,3); -pxy(end:-1:1,3)];
# h = patch(t, edge, opt.color);
# set(h, 'EdgeColor', 'none', 'FaceAlpha', 0.3);
# end
# hold on
# plot(err(:,3), args{:});
# hold off
# grid
# xlabel('Time step')
# ylabel('\theta error')
[docs] def get_map(self):
"""
Get estimated map
:return: landmark coordinates :math:`(x, y)`
:rtype: ndarray(n,2)
Landmarks are returned in the order they were first observed.
:seealso: :meth:`landmarks` :meth:`run` :meth:`history`
"""
xy = []
for lm_id, (jx, n) in self._landmarks.items():
# jx is an index into the *landmark* part of the state
# vector, we need to offset it to account for the vehicle
# state if we are estimating vehicle as well
if self._est_vehicle:
jx += 3
xf = self._x_est[jx : jx + 2]
xy.append(xf)
return np.array(xy)
[docs] def plot_map(self, marker=None, ellipse=None, confidence=0.95, block=None):
"""
Plot estimated landmarks
:param marker: plot marker for landmark, arguments passed to :meth:`~matplotlib.axes.Axes.plot`, defaults to "r+"
:type marker: dict, optional
:param ellipse: arguments passed to :meth:`~spatialmath.base.graphics.plot_ellipse`, defaults to None
:type ellipse: dict, optional
:param confidence: ellipse confidence interval, defaults to 0.95
:type confidence: float, optional
:param block: hold plot until figure is closed, defaults to None
:type block: bool, optional
Plot a marker and covariance ellipses for each estimated landmark.
:seealso: :meth:`get_map` :meth:`run` :meth:`history`
"""
if marker is None:
marker = {
"marker": "+",
"markersize": 10,
"markerfacecolor": "red",
"linewidth": 0,
}
xm = self._x_est
P = self._P_est
if self._est_vehicle:
xm = xm[3:]
P = P[3:, 3:]
# mark the estimate as a point
xm = xm.reshape((-1, 2)) # arrange as Nx2
plt.plot(xm[:, 0], xm[:, 1], label="estimated landmark", **marker)
# add an ellipse
if ellipse is not None:
for i in range(xm.shape[0]):
Pi = self.P_est[i : i + 2, i : i + 2]
# put ellipse in the legend only once
if i == 0:
base.plot_ellipse(
Pi,
centre=xm[i, :],
confidence=confidence,
inverted=True,
label=f"{confidence*100:.3g}% confidence",
**ellipse,
)
else:
base.plot_ellipse(
Pi,
centre=xm[i, :],
confidence=confidence,
inverted=True,
**ellipse,
)
# plot_ellipse( P * chi2inv_rtb(opt.confidence, 2), xf, args{:});
if block is not None:
plt.show(block=block)
[docs] def get_P(self, k=None):
"""
Get covariance matrices from simulation
:param k: timestep, defaults to None
:type k: int, optional
:return: covariance matrix
:rtype: ndarray(n,n) or list of ndarray(n,n)
If ``k`` is given return covariance from simulation timestep ``k``, else
return a list of all covariance matrices.
:seealso: :meth:`get_Pnorm` :meth:`run` :meth:`history`
"""
if k is not None:
return self._history[k].P
else:
return [h.P for h in self._history]
[docs] def get_Pnorm(self, k=None):
"""
Get covariance norm from simulation
:param k: timestep, defaults to None
:type k: int, optional
:return: covariance matrix norm
:rtype: float or ndarray(n)
If ``k`` is given return covariance norm from simulation timestep ``k``, else
return all covariance norms as a 1D NumPy array.
:seealso: :meth:`get_P` :meth:`run` :meth:`history`
"""
if k is not None:
return np.sqrt(np.linalg.det(self._history[k].P))
else:
p = [np.sqrt(np.linalg.det(h.P)) for h in self._history]
return np.array(p)
[docs] def disp_P(self, P, colorbar=False):
"""
Display covariance matrix
:param P: covariance matrix
:type P: ndarray(n,n)
:param colorbar: add a colorbar
:type: bool or dict
Plot the elements of the covariance matrix as an image. If ``colorbar``
is True add a color bar, if `colorbar` is a dict add a color bar with
these options passed to colorbar.
.. note:: A log scale is used.
:seealso: :meth:`~matplotlib.axes.Axes.imshow` :func:`matplotlib.pyplot.colorbar`
"""
z = np.log10(abs(P))
mn = min(z[~np.isinf(z)])
z[np.isinf(z)] = mn
plt.xlabel("State")
plt.ylabel("State")
plt.imshow(z, cmap="Reds")
if colorbar is True:
plt.colorbar(label="log covariance")
elif isinstance(colorbar, dict):
plt.colorbar(**colorbar)
if __name__ == "__main__":
from roboticstoolbox import *
V = np.diag([0.02, np.deg2rad(0.5)]) ** 2
robot = Bicycle(covar=V, animation="car")
robot.control = RandomPath(workspace=10)
P0 = np.diag([1, 1, 1])
V = np.diag([1, 1])
ekf = EKF(robot=(robot, V), P0=P0, animate=False)
print(ekf)
ekf.run_animation(T=20, format="mp4", file="ekf.mp4")
# from roboticstoolbox import Bicycle
# ### RVC2: Chapter 6
# ## 6.1 Dead reckoning
# ## 6.1.1 Modeling the vehicle
# V = np.diag(np.r_[0.02, 0.5*pi/180] ** 2);
# veh = Bicycle(covar=V)
# odo = veh.step(1, 0.3)
# print(veh.x)
# veh.f([0, 0, 0], odo)
# # veh.add_driver( RandomPath(10) )
# # veh.run()
# ### 6.1.2 Estimating pose
# # veh.Fx( [0,0,0], [0.5, 0.1] )
# P0 = np.diag(np.r_[0.005, 0.005, 0.001]**2);
# ekf = EKF(veh, V, P0);
# ekf.run(1000);
# veh.plot_xy()
# ekf.plot_xy('r')
# P700 = ekf.history(700).P
# sqrt(P700(1,1))
# ekf.plot_ellipse('g')
# # 6.2 Map-based localization
# # randinit
# # map = LandmarkMap(20, 10)
# # map.plot()
# # W = diag([0.1, 1*pi/180].^2);
# # sensor = RangeBearingSensor(veh, map, 'covar', W)
# # [z,i] = sensor.reading()
# # map.landmark(17)
# # randinit
# # map = LandmarkMap(20);
# # veh = Bicycle('covar', V);
# # veh.add_driver( RandomPath(map.dim) );
# # sensor = RangeBearingSensor(veh, map, 'covar', W, 'angle', ...
# # [-pi/2 pi/2], 'range', 4, 'animate');
# # ekf = EKF(veh, V, P0, sensor, W, map);
# # ekf.run(1000);
# # map.plot()
# # veh.plot_xy();
# # ekf.plot_xy('r');
# # ekf.plot_ellipse('k')
# # 6.3 Creating a map
# # randinit
# # map = LandmarkMap(20);
# # veh = Bicycle(); error free vehicle
# # veh.add_driver( RandomPath(map.dim) );
# # W = diag([0.1, 1*pi/180].^2);
# # sensor = RangeBearingSensor(veh, map, 'covar', W);
# # ekf = EKF(veh, [], [], sensor, W, []);
# # ekf.run(1000);
# # map.plot();
# # ekf.plot_map('g');
# # veh.plot_xy('b');
# # ekf.landmarks(:,6)
# # ekf.x_est(19:20)'
# # ekf.P_est(19:20,19:20)
# # 6.4 EKF SLAM
# # randinit
# # P0 = diag([.01, .01, 0.005].^2);
# # map = LandmarkMap(20);
# # veh = Bicycle('covar', V);
# # veh.add_driver( RandomPath(map.dim) );
# # sensor = RangeBearingSensor(veh, map, 'covar', W);
# # ekf = EKF(veh, V, P0, sensor, W, []);
# # ekf.run(1000);
# # map.plot();
# # ekf.plot_map('g');
# # ekf.plot_xy('r');
# # veh.plot_xy('b');
# # 6.6 Pose-graph SLAM
# # syms x_i y_i theta_i x_j y_j theta_j x_m y_m theta_m assume real
# # xi_e = inv( SE2(x_m, y_m, theta_m) ) * inv( SE2(x_i, y_i, theta_i) ) * SE2(x_j, y_j, theta_j);
# # fk = simplify(xi_e.xyt);
# # jacobian ( fk, [x_i y_i theta_i] );
# # Ai = simplify (ans)
# # pg = PoseGraph('pg1.g2o')
# # clf
# # pg.plot()
# # pg.optimize('animate')
# # pg = PoseGraph('killian-small.toro');
# # pg.plot()
# # pg.optimize()
# # 6.7 Particle filter
# # randinit
# # map = LandmarkMap(20);
# # W = diag([0.1, 1*pi/180].^2);
# # veh = Bicycle('covar', V);
# # veh.add_driver( RandomPath(10) );
# # V = diag([0.005, 0.5*pi/180].^2);
# # sensor = RangeBearingSensor(veh, map, 'covar', W);
# # Q = diag([0.1, 0.1, 1*pi/180]).^2;
# # L = diag([0.1 0.1]);
# # pf = ParticleFilter(veh, sensor, Q, L, 1000);
# # pf.run(1000);
# # map.plot();
# # veh.plot_xy('b');
# # clf
# # pf.plot_xy('r');
# # clf
# # plot(pf.std(1:100,:))
# # clf
# # pf.plot_pdf()
# # 6.8 Application: Scanning laser rangefinder
# # Laser odometry
# # pg = PoseGraph('killian.g2o', 'laser');
# # [r, theta] = pg.scan(2580);
# # about r theta
# # clf
# # polar(theta, r)
# # [x,y] = pol2cart (theta, r);
# # plot (x, y, '.')
# # p2580 = pg.scanxy(2580);
# # p2581 = pg.scanxy(2581);
# # about p2580
# # T = icp( p2581, p2580, 'verbose' , 'T0', transl2(0.5, 0), 'distthresh', 3)
# # pg.time(2581)-pg.time(2580)
# # Laser-based map building
# # map = pg.scanmap();
# # pg.plot_occgrid(map)