Source code for roboticstoolbox.mobile.ParticleFilter

#!/usr/bin/env python3

"""
Python EKF Planner
@Author: Peter Corke, original MATLAB code and Python version
@Author: Kristian Gibson, initial MATLAB port

Based on code by Paul Newman, Oxford University, 
http://www.robots.ox.ac.uk/~pnewman
"""

from collections import namedtuple

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
from matplotlib import animation

import spatialmath.base as smb

"""
Monte-carlo based localisation for estimating vehicle pose based on
odometry and observations of known landmarks.
"""

# TODO: refactor this and EKF, RNG, history, common plots, animation, movie


[docs]class ParticleFilter:
[docs] def __init__( self, robot, sensor, R, L, nparticles=500, seed=0, x0=None, verbose=False, animate=False, history=True, workspace=None, ): """ Particle filter :param robot: robot motion model :type robot: :class:`VehicleBase` subclass, :param sensor: vehicle mounted sensor model :type sensor: :class:`SensorBase` subclass :param R: covariance of the zero-mean Gaussian noise added to the particles at each step (diffusion) :type R: ndarray(3,3) :param L: covariance used in the sensor likelihood model :type L: ndarray(2,2) :param nparticles: number of particles, defaults to 500 :type nparticles: int, optional :param seed: random number seed, defaults to 0 :type seed: int, optional :param x0: initial state, defaults to [0, 0, 0] :type x0: array_like(3), optional :param verbose: display extra debug information, defaults to False :type verbose: bool, optional :param history: retain step-by-step history, defaults to True :type history: bool, optional :param workspace: dimension of workspace, see :func:`~spatialmath.base.graphics.expand_dims` :type workspace: scalar, array_like(2), array_like(4) This class implements a Monte-Carlo estimator or particle filter for vehicle state, based on odometry, a landmark map, and landmark observations. The state of each particle is a possible vehicle configuration :math:`(x,y,\theta)`. Bootstrap particle resampling is used. The working area is defined by ``workspace`` or inherited from the landmark map attached to the ``sensor`` (see :func:`~spatialmath.base.graphics.expand_dims`): ============== ======= ======= ``workspace`` x-range y-range ============== ======= ======= A (scalar) -A:A -A:A [A, B] A:B A:B [A, B, C, D] A:B C:D ============== ======= ======= Particles are initially distributed uniform randomly over this area. Example:: V = np.diag([0.02, np.radians(0.5)]) ** 2 robot = Bicycle(covar=V, animation="car", workspace=10) robot.control = RandomPath(workspace=robot) map = LandmarkMap(nlandmarks=20, workspace=robot.workspace) W = np.diag([0.1, np.radians(1)]) ** 2 sensor = RangeBearingSensor(robot, map, covar=W, plot=True) R = np.diag([0.1, 0.1, np.radians(1)]) ** 2 L = np.diag([0.1, 0.1]) pf = ParticleFilter(robot, sensor, R, L, nparticles=1000) pf.run(T=10) map.plot() robot.plot_xy() pf.plot_xy() plt.plot(pf.get_std()[:100,:]) .. note:: Set ``seed=0`` to get different behaviour from run to run. :seealso: :meth:`run` """ self._robot = robot self._sensor = sensor self.R = R self.L = L self.nparticles = nparticles self._animate = animate # self.dim = sensor.map.dim self._history = [] self.x = () self.weight = () self.w0 = 0.05 self._x0 = x0 # create a private random number stream if required self._random = np.random.default_rng(seed) self._seed = seed self._keep_history = history # keep history self._htuple = namedtuple("PFlog", "t odo xest std weights") if workspace is not None: self._dim = smb.expand_dims(workspace) else: self._dim = sensor.map.workspace self._workspace = self.robot.workspace
# self._init() def __str__(self): # ParticleFilter.char Convert to string # # PF.char() is a string representing the state of the ParticleFilter # object in human-readable form. # # See also ParticleFilter.display. def indent(s, n=2): spaces = " " * n return s.replace("\n", "\n" + spaces) s = f"ParticleFilter object: {self.nparticles} particles" s += "\nR: " + smb.array2str(self.R) s += "\nL: " + smb.array2str(self.L) if self.robot is not None: s += indent("\nrobot: " + str(self.robot)) if self.sensor is not None: s += indent("\nsensor: " + str(self.sensor)) return s @property def robot(self): """ Get robot object :return: robot used in simulation :rtype: :class:`VehicleBase` subclass """ return self._robot @property def sensor(self): """ Get sensor object :return: sensor used in simulation :rtype: :class:`SensorBase` subclass """ return self._sensor @property def map(self): """ Get map object :return: map used in simulation :rtype: :class:`LandmarkMap` subclass """ return self._map @property def verbose(self): """ Get verbosity state :return: verbosity :rtype: bool """ return self._verbose @property def history(self): """ Get EKF simulation history :return: simulation history :rtype: list of namedtuples At each simulation timestep a namedtuple of is appended to the history list. It contains, for that time step, estimated state and covariance, and sensor observation. :seealso: :meth:`get_t` :meth:`get_xy` :meth:`get_std` :meth:`get_Pnorm` """ return self._history @property def workspace(self): """ Size of robot workspace :return: workspace bounds [xmin, xmax, ymin, ymax] :rtype: ndarray(4) Returns the bounds of the workspace as specified by constructor option ``workspace`` """ return self._workspace @property def random(self): """ Get private random number generator :return: NumPy random number generator :rtype: :class:`numpy.random.Generator` Has methods including: - ``integers(low, high, size, endpoint)`` - ``random(size)`` - ``uniform`` - ``normal(mean, std, size)`` - ``multivariate_normal(mean, covar, size)`` The generator is initialized with the seed provided at constructor time every time ``init`` is called. """ return self._random def _init(self, x0=None, animate=False, ax=None): # ParticleFilter.init Initialize the particle filter # # PF.init() initializes the particle distribution and clears the # history. # # Notes:: # - If initial particle states were given to the constructor the states are # set to this value, else a random distribution over the map is used. # - Invoked by the run() method. self.robot.init() self.sensor.init() # clear the history self._history = [] # create a new private random number generator if self._seed is not None: self._random = np.random.default_rng(self._seed) self._t = 0 # initialize particles if x0 is None: x0 = self._x0 if x0 is None: # create initial particle distribution as uniformly randomly distributed # over the map workspace and heading angles x = self.random.uniform( self.workspace[0], self.workspace[1], size=(self.nparticles,) ) y = self.random.uniform( self.workspace[2], self.workspace[3], size=(self.nparticles,) ) t = self.random.uniform(-np.pi, np.pi, size=(self.nparticles,)) self.x = np.c_[x, y, t] if animate: # display the initial particles (self.h,) = ax.plot( self.x[:, 0], self.x[:, 1], "go", zorder=0, markersize=3, markeredgecolor="none", alpha=0.3, label="particle", ) self.weight = np.ones((self.nparticles,))
[docs] def run(self, T=10, x0=None): """ Run the particle filter simulation :param T: maximum simulation time in seconds :type T: float :param x0: Initial state, defaults to value given to Vehicle constructor :type x0: array_like(3) or array_like(2) Simulates the motion of a vehicle (under the control of a driving agent) and the particle-filter estimator. The steps are: - initialize the filter, vehicle and vehicle driver agent, sensor - for each time step: - step the vehicle and its driver agent, obtain odometry - take a sensor reading - execute the EKF - save information as a namedtuple to the history list for later display :seealso: :meth:`history` :meth:`landmark` :meth:`landmarks` :meth:`get_xy` :meth:`get_t` :meth:`get_std` :meth:`plot_xy` """ self._init(x0=x0) # anim = Animate(opt.movie) # display the initial particles ax = smb.axes_logic(None, 2) if self._animate: (self.h,) = ax.plot( self.x[:, 0], self.x[:, 1], "go", zorder=0, markersize=3, markeredgecolor="none", alpha=0.3, label="particle", ) # set(self.h, 'Tag', 'particles') # self.robot.plot() # iterate over time import time for i in range(round(T / self.robot.dt)): self._step() # time.sleep(0.2) plt.pause(0.2)
# plt.draw() # anim.add() # anim.close()
[docs] def run_animation(self, T=10, x0=None, format=None, file=None): """ Run the particle filter simulation :param T: maximum simulation time in seconds :type T: float :param x0: Initial state, defaults to value given to Vehicle constructor :type x0: array_like(3) or array_like(2) :param format: Output format :type format: str, optional :param file: File name :type file: str, optional :return: Matplotlib animation object :rtype: :meth:`matplotlib.animation.FuncAnimation` Simulates the motion of a vehicle (under the control of a driving agent) and the particle-filter estimator and returns an animation in various formats:: ``format`` ``file`` description ============ ========= ============================ ``"html"`` str, None return HTML5 video ``"jshtml"`` str, None return JS+HTML video ``"gif"`` str return animated GIF ``"mp4"`` str return MP4/H264 video ``None`` return a ``FuncAnimation`` object The allowables types for ``file`` are given in the second column. A str value is the file name. If ``None`` is an option then return the video as a string. For the last case, a reference to the animation object must be held if used for animation in a Jupyter cell:: anim = robot.run_animation(T=20) The steps are: - initialize the filter, vehicle and vehicle driver agent, sensor - for each time step: - step the vehicle and its driver agent, obtain odometry - take a sensor reading - execute the EKF - save information as a namedtuple to the history list for later display :seealso: :meth:`history` :meth:`landmark` :meth:`landmarks` :meth:`get_xy` :meth:`get_t` :meth:`get_std` :meth:`plot_xy` """ fig, ax = plt.subplots() nframes = round(T / self.robot.dt) anim = animation.FuncAnimation( fig=fig, # func=lambda i: self._step(animate=True, pause=False), # init_func=lambda: self._init(animate=True), func=lambda i: self._step(), init_func=lambda: self._init(ax=ax, animate=True), frames=nframes, interval=self.robot.dt * 1000, blit=False, repeat=False, ) # anim._interval = self.dt*1000/2 # anim._repeat = True ret = None if format == "html": ret = anim.to_html5_video() # convert to embeddable HTML5 animation elif format == "jshtml": ret = anim.to_jshtml() # convert to embeddable Javascript/HTML animation elif format == "gif": anim.save( file, writer=animation.PillowWriter(fps=1 / self.dt) ) # convert to GIF ret = None elif format == "mp4": anim.save( file, writer=animation.FFMpegWriter(fps=1 / self.dt) ) # convert to mp4/H264 ret = None elif format == None: # return the anim object return anim else: raise ValueError("unknown format") if ret is not None and file is not None: with open(file, "w") as f: f.write(ret) ret = None plt.close(fig) return ret
# self._init(x0=x0) # # anim = Animate(opt.movie) # # display the initial particles # ax = smb.axes_logic(None, 2) # if self._animate: # (self.h,) = ax.plot( # self.x[:, 0], # self.x[:, 1], # "go", # zorder=0, # markersize=3, # markeredgecolor="none", # alpha=0.3, # label="particle", # ) # # set(self.h, 'Tag', 'particles') # # self.robot.plot() # # iterate over time # import time # for i in range(round(T / self.robot.dt)): # self._step() # # time.sleep(0.2) # plt.pause(0.2) # # plt.draw() # # anim.add() # # anim.close() def _step(self): # fprintf('---- step\n') odo = self.robot.step(animate=self._animate) # move the robot # update the particles based on odometry self._predict(odo) # get a sensor reading z, lm_id = self.sensor.reading() if z is not None: self._observe(z, lm_id) # fprintf(' observe beacon #d\n', lm_id) self._select() # our estimate is simply the mean of the particles x_est = self.x.mean(axis=0) std_est = self.x.std(axis=0) # std is more complex for angles, need to account for 2pi wrap std_est[2] = np.sqrt(np.sum(smb.angdiff(self.x[:, 2], x_est[2]) ** 2)) / ( self.nparticles - 1 ) # display the updated particles # set(self.h, 'Xdata', self.x(:,1), 'Ydata', self.x(:,2), 'Zdata', self.x(:,3)) if self._animate: self.h.set_xdata(self.x[:, 0]) self.h.set_ydata(self.x[:, 1]) # if ~isempty(self.anim) # self.anim.add() if self._keep_history: hist = self._htuple( self.robot._t, odo.copy(), x_est, std_est, self.weight.copy() ) self._history.append(hist)
[docs] def plot_pdf(self): """ Plot particle PDF Displays a discrete PDF of vehicle position. Creates a 3D plot where the x- and y-axes are the estimated vehicle position and the z-axis is the particle weight. Each particle is represented by a a vertical line segment of height equal to particle weight. """ ax = smb.plotvol3() for (x, y, t), weight in zip(self.x, self.weight): # ax.plot([x, x], [y, y], [0, weight], 'r') ax.plot([x, x], [y, y], [0, weight], "skyblue", linewidth=3) ax.plot(x, y, weight, "k.", markersize=6) ax.grid(True) ax.set_xlabel("X") ax.set_ylabel("Y") ax.set_xlim() ax.set_zlabel("particle weight") ax.view_init(29, 59)
def _predict(self, odo): # step 2 # update the particle state based on odometry and a random perturbation # Straightforward code: # # for i=1:self.nparticles # x = self.robot.f( self.x(i,:), odo)' + sqrt(self.R)*self.randn[2,0] # x[2] = angdiff(x[2]) # self.x(i,:) = x # # Vectorized code: self.x = self.robot.f(self.x, odo) + self.random.multivariate_normal( (0, 0, 0), self.R, size=self.nparticles ) self.x[:, 2] = smb.angdiff(self.x[:, 2]) def _observe(self, z, lm_id): # step 3 # predict observation and score the particles # Straightforward code: # # for p = 1:self.nparticles # # what do we expect observation to be for this particle? # # use the sensor model h(.) # z_pred = self.sensor.h( self.x(p,:), lm_id) # # # how different is it # innov[0] = z[0] - z_pred[0] # innov[1] = angdiff(z[1], z_pred[1]) # # # get likelihood (new importance). Assume Gaussian but any PDF works! # # If predicted obs is very different from actual obs this score will be low # # ie. this particle is not very good at predicting the observation. # # A lower score means it is less likely to be selected for the next generation... # # The weight is never zero. # self.weight(p) = exp(-0.5*innov'*inv(self.L)*innov) + 0.05 # end # # Vectorized code: invL = np.linalg.inv(self.L) z_pred = self.sensor.h(self.x, lm_id) z_pred[:, 0] = z[0] - z_pred[:, 0] z_pred[:, 1] = smb.angdiff(z[1], z_pred[:, 1]) LL = -0.5 * np.r_[invL[0, 0], invL[1, 1], 2 * invL[0, 1]] e = ( np.c_[z_pred[:, 0] ** 2, z_pred[:, 1] ** 2, z_pred[:, 0] * z_pred[:, 1]] @ LL ) self.weight = np.exp(e) + self.w0 def _select(self): # step 4 # select particles based on their weights # # particles with large weights will occupy a greater percentage of the # y axis in a cummulative plot cdf = np.cumsum(self.weight) / self.weight.sum() # so randomly (uniform) choosing y values is more likely to correspond to # better particles... iselect = self.random.uniform(0, 1, size=(self.nparticles,)) # find the particle that corresponds to each y value (just a look up) interpfun = sp.interpolate.interp1d( cdf, np.arange(self.nparticles), assume_sorted=True, kind="nearest", fill_value="extrapolate", ) inextgen = interpfun(iselect).astype(int) # copy selected particles for next generation.. self.x = self.x[inextgen, :]
[docs] def get_t(self): """ Get time from simulation :return: simulation time vector :rtype: ndarray(n) Return simulation time vector, starts at zero. The timestep is an attribute of the ``robot`` object. """ return np.array([h.t for h in self._history])
[docs] def get_xyt(self): r""" Get estimated vehicle trajectory :return: vehicle trajectory where each row is configuration :math:`(x, y, \theta)` :rtype: ndarray(n,3) :seealso: :meth:`plot_xy` :meth:`run` :meth:`history` """ return np.array([h.xest[:2] for h in self._history])
[docs] def get_std(self): r""" Get standard deviation of particles :return: standard deviation of vehicle position estimate :rtype: ndarray(n,2) Return the standard deviation :math:`(\sigma_x, \sigma_y)` of the particle cloud at each time step. :seealso: :meth:`get_xyt` """ return np.array([h.std for h in self._history])
[docs] def plot_xy(self, block=None, **kwargs): r""" Plot estimated vehicle position :param args: position arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param kwargs: keywords arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param block: hold plot until figure is closed, defaults to None :type block: bool, optional Plot the estimated vehicle path in the xy-plane. :seealso: :meth:`get_xy` """ xyt = self.get_xyt() plt.plot(xyt[:, 0], xyt[:, 1], **kwargs) if block is not None: plt.show(block=block)
if __name__ == "__main__": from roboticstoolbox import * map = LandmarkMap(20, workspace=10) V = np.diag([0.02, np.deg2rad(0.5)]) ** 2 robot = Bicycle(covar=V, animation="car", workspace=map) robot.control = RandomPath(workspace=map) W = np.diag([0.1, np.deg2rad(1)]) ** 2 sensor = RangeBearingSensor(robot, map, covar=W, plot=True) R = np.diag([0.1, 0.1, np.deg2rad(1)]) ** 2 L = np.diag([0.1, 0.1]) pf = ParticleFilter(robot, sensor=sensor, R=R, L=L, nparticles=1000, animate=True) pf.run(T=10)