Source code for roboticstoolbox.mobile.ParticleFilter

#!/usr/bin/env python3

"""
Python EKF Planner
@Author: Peter Corke, original MATLAB code and Python version
@Author: Kristian Gibson, initial MATLAB port

Based on code by Paul Newman, Oxford University, 
http://www.robots.ox.ac.uk/~pnewman
"""

from collections import namedtuple

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
from spatialmath import base

"""
Monte-carlo based localisation for estimating vehicle pose based on
odometry and observations of known landmarks.
"""

# TODO: refactor this and EKF, RNG, history, common plots, animation, movie

[docs]class ParticleFilter:
[docs] def __init__(self, robot, sensor, R, L, nparticles=500, seed=0, x0=None, verbose=False, animate=False, history=True, workspace=None): """ Particle filter :param robot: robot motion model :type robot: :class:`VehicleBase` subclass, :param sensor: vehicle mounted sensor model :type sensor: :class:`SensorBase` subclass :param R: covariance of the zero-mean Gaussian noise added to the particles at each step (diffusion) :type R: ndarray(3,3) :param L: covariance used in the sensor likelihood model :type L: ndarray(2,2) :param nparticles: number of particles, defaults to 500 :type nparticles: int, optional :param seed: random number seed, defaults to 0 :type seed: int, optional :param x0: initial state, defaults to [0, 0, 0] :type x0: array_like(3), optional :param verbose: display extra debug information, defaults to False :type verbose: bool, optional :param history: retain step-by-step history, defaults to True :type history: bool, optional :param workspace: dimension of workspace, see :func:`~spatialmath.base.graphics.expand_dims` :type workspace: scalar, array_like(2), array_like(4) This class implements a Monte-Carlo estimator or particle filter for vehicle state, based on odometry, a landmark map, and landmark observations. The state of each particle is a possible vehicle configuration :math:`(x,y,\theta)`. Bootstrap particle resampling is used. The working area is defined by ``workspace`` or inherited from the landmark map attached to the ``sensor`` (see :func:`~spatialmath.base.graphics.expand_dims`): ============== ======= ======= ``workspace`` x-range y-range ============== ======= ======= A (scalar) -A:A -A:A [A, B] A:B A:B [A, B, C, D] A:B C:D ============== ======= ======= Particles are initially distributed uniform randomly over this area. Example:: V = np.diag([0.02, np.radians(0.5)]) ** 2 robot = Bicycle(covar=V, animation="car", workspace=10) robot.control = RandomPath(workspace=robot) map = LandmarkMap(nlandmarks=20, workspace=robot.workspace) W = np.diag([0.1, np.radians(1)]) ** 2 sensor = RangeBearingSensor(robot, map, covar=W, plot=True) R = np.diag([0.1, 0.1, np.radians(1)]) ** 2 L = np.diag([0.1, 0.1]) pf = ParticleFilter(robot, sensor, R, L, nparticles=1000) pf.run(T=10) map.plot() robot.plot_xy() pf.plot_xy() plt.plot(pf.get_std()[:100,:]) .. note:: Set ``seed=0`` to get different behaviour from run to run. :seealso: :meth:`run` """ self._robot = robot self._sensor = sensor self.R = R self.L = L self.nparticles = nparticles self._animate = animate # self.dim = sensor.map.dim self._history = [] self.x = () self.weight = () self.w0 = 0.05 self._x0 = x0 # create a private random number stream if required self._random = np.random.default_rng(seed) self._seed = seed self._keep_history = history # keep history self._htuple = namedtuple("PFlog", "t odo xest std weights") if workspace is not None: self._dim = base.expand_dims(workspace) else: self._dim = sensor.map.workspace self._workspace = self.robot.workspace self._init()
def __str__(self): #ParticleFilter.char Convert to string # # PF.char() is a string representing the state of the ParticleFilter # object in human-readable form. # # See also ParticleFilter.display. def indent(s, n=2): spaces = ' ' * n return s.replace('\n', '\n' + spaces) s = f"ParticleFilter object: {self.nparticles} particles" s += '\nR: ' + base.array2str(self.R) s += '\nL: ' + base.array2str(self.L) if self.robot is not None: s += indent("\nrobot: " + str(self.robot)) if self.sensor is not None: s += indent("\nsensor: " + str(self.sensor)) return s @property def robot(self): """ Get robot object :return: robot used in simulation :rtype: :class:`VehicleBase` subclass """ return self._robot @property def sensor(self): """ Get sensor object :return: sensor used in simulation :rtype: :class:`SensorBase` subclass """ return self._sensor @property def map(self): """ Get map object :return: map used in simulation :rtype: :class:`LandmarkMap` subclass """ return self._map @property def verbose(self): """ Get verbosity state :return: verbosity :rtype: bool """ return self._verbose @property def history(self): """ Get EKF simulation history :return: simulation history :rtype: list of namedtuples At each simulation timestep a namedtuple of is appended to the history list. It contains, for that time step, estimated state and covariance, and sensor observation. :seealso: :meth:`get_t` :meth:`get_xy` :meth:`get_std` :meth:`get_Pnorm` """ return self._history @property def workspace(self): """ Size of robot workspace :return: workspace bounds [xmin, xmax, ymin, ymax] :rtype: ndarray(4) Returns the bounds of the workspace as specified by constructor option ``workspace`` """ return self._workspace @property def random(self): """ Get private random number generator :return: NumPy random number generator :rtype: :class:`numpy.random.Generator` Has methods including: - ``integers(low, high, size, endpoint)`` - ``random(size)`` - ``uniform`` - ``normal(mean, std, size)`` - ``multivariate_normal(mean, covar, size)`` The generator is initialized with the seed provided at constructor time every time ``init`` is called. """ return self._random def _init(self, x0=None): #ParticleFilter.init Initialize the particle filter # # PF.init() initializes the particle distribution and clears the # history. # # Notes:: # - If initial particle states were given to the constructor the states are # set to this value, else a random distribution over the map is used. # - Invoked by the run() method. self.robot.init() self.sensor.init() #clear the history self._history = [] # create a new private random number generator if self._seed is not None: self._random = np.random.default_rng(self._seed) self._t = 0 # initialize particles if x0 is None: x0 = self._x0 if x0 is None: # create initial particle distribution as uniformly randomly distributed # over the map workspace and heading angles x = self.random.uniform(self.workspace[0], self.workspace[1], size=(self.nparticles,)) y = self.random.uniform(self.workspace[2], self.workspace[3], size=(self.nparticles,)) t = self.random.uniform(-np.pi, np.pi, size=(self.nparticles,)) self.x = np.c_[x, y, t] self.weight = np.ones((self.nparticles,))
[docs] def run(self, T=10, x0=None): """ Run the particle filter simulation :param T: maximum simulation time in seconds :type T: float :param animate: animate motion of vehicle, defaults to False :type animate: bool, optional :param movie: name of movie file to create, defaults to None :type movie: str, optional Simulates the motion of a vehicle (under the control of a driving agent) and the EKF estimator. The steps are: - initialize the filter, vehicle and vehicle driver agent, sensor - for each time step: - step the vehicle and its driver agent, obtain odometry - take a sensor reading - execute the EKF - save information as a namedtuple to the history list for later display :seealso: :meth:`history` :meth:`landmark` :meth:`landmarks` :meth:`get_xy` :meth:`get_t` :meth:`get_std` :meth:`plot_xy` """ self._init(x0=x0) # anim = Animate(opt.movie) # display the initial particles if self._animate: self.h, = plt.plot(self.x[:, 0], self.x[:, 1], 'go', zorder=0, markersize=3, markeredgecolor='none', alpha=0.3, label='particle') # set(self.h, 'Tag', 'particles') # self.robot.plot() # iterate over time for i in range(round(T / self.robot.dt)): self._step()
# anim.add() # anim.close() def _step(self): #fprintf('---- step\n') odo = self.robot.step() # move the robot # update the particles based on odometry self._predict(odo) # get a sensor reading z, lm_id = self.sensor.reading() if z is not None: self._observe(z, lm_id) #fprintf(' observe beacon #d\n', lm_id) self._select() # our estimate is simply the mean of the particles x_est = self.x.mean(axis=0) std_est = self.x.std(axis=0) # std is more complex for angles, need to account for 2pi wrap std_est[2] = np.sqrt(np.sum(base.angdiff(self.x[:,2], x_est[2]) ** 2)) / (self.nparticles-1) # display the updated particles # set(self.h, 'Xdata', self.x(:,1), 'Ydata', self.x(:,2), 'Zdata', self.x(:,3)) if self._animate: self.h.set_xdata(self.x[:, 0]) self.h.set_ydata(self.x[:, 1]) # if ~isempty(self.anim) # self.anim.add() if self._keep_history: hist = self._htuple( self.robot._t, odo.copy(), x_est, std_est, self.weight.copy() ) self._history.append(hist)
[docs] def plot_pdf(self): """ Plot particle PDF Displays a discrete PDF of vehicle position. Creates a 3D plot where the x- and y-axes are the estimated vehicle position and the z-axis is the particle weight. Each particle is represented by a a vertical line segment of height equal to particle weight. """ ax = base.plotvol3() for (x, y, t), weight in zip(self.x, self.weight): # ax.plot([x, x], [y, y], [0, weight], 'r') ax.plot([x, x], [y, y], [0, weight], 'skyblue', linewidth=3) ax.plot(x, y, weight, 'k.', markersize=6) plt.grid(True) plt.xlabel('X') plt.ylabel('Y') plt.xlim() ax.set_zlabel('particle weight') ax.view_init(29, 59)
def _predict(self, odo): # step 2 # update the particle state based on odometry and a random perturbation # Straightforward code: # # for i=1:self.nparticles # x = self.robot.f( self.x(i,:), odo)' + sqrt(self.R)*self.randn[2,0] # x[2] = angdiff(x[2]) # self.x(i,:) = x # # Vectorized code: self.x = self.robot.f(self.x, odo) + \ self.random.multivariate_normal((0, 0, 0), self.R, size=self.nparticles) self.x[:, 2] = base.angdiff(self.x[:, 2]) def _observe(self, z, lm_id): # step 3 # predict observation and score the particles # Straightforward code: # # for p = 1:self.nparticles # # what do we expect observation to be for this particle? # # use the sensor model h(.) # z_pred = self.sensor.h( self.x(p,:), lm_id) # # # how different is it # innov[0] = z[0] - z_pred[0] # innov[1] = angdiff(z[1], z_pred[1]) # # # get likelihood (new importance). Assume Gaussian but any PDF works! # # If predicted obs is very different from actual obs this score will be low # # ie. this particle is not very good at predicting the observation. # # A lower score means it is less likely to be selected for the next generation... # # The weight is never zero. # self.weight(p) = exp(-0.5*innov'*inv(self.L)*innov) + 0.05 # end # # Vectorized code: invL = np.linalg.inv(self.L) z_pred = self.sensor.h(self.x, lm_id) z_pred[:, 0] = z[0] - z_pred[:, 0] z_pred[:, 1] = base.angdiff(z[1], z_pred[:, 1]) LL = -0.5 * np.r_[invL[0,0], invL[1,1], 2*invL[0,1]] e = np.c_[z_pred[:, 0]**2, z_pred[:, 1]**2, z_pred[:,0] * z_pred[:, 1]] @ LL self.weight = np.exp(e) + self.w0 def _select(self): # step 4 # select particles based on their weights # # particles with large weights will occupy a greater percentage of the # y axis in a cummulative plot cdf = np.cumsum(self.weight) / self.weight.sum() # so randomly (uniform) choosing y values is more likely to correspond to # better particles... iselect = self.random.uniform(0, 1, size=(self.nparticles,)) # find the particle that corresponds to each y value (just a look up) interpfun = sp.interpolate.interp1d(cdf, np.arange(self.nparticles), assume_sorted=True, kind='nearest', fill_value='extrapolate') inextgen = interpfun(iselect).astype(np.int) # copy selected particles for next generation.. self.x = self.x[inextgen, :]
[docs] def get_t(self): """ Get time from simulation :return: simulation time vector :rtype: ndarray(n) Return simulation time vector, starts at zero. The timestep is an attribute of the ``robot`` object. """ return np.array([h.t for h in self._history])
[docs] def get_xyt(self): r""" Get estimated vehicle trajectory :return: vehicle trajectory where each row is configuration :math:`(x, y, \theta)` :rtype: ndarray(n,3) :seealso: :meth:`plot_xy` :meth:`run` :meth:`history` """ return np.array([h.xest[:2] for h in self._history])
[docs] def get_std(self): r""" Get standard deviation of particles :return: standard deviation of vehicle position estimate :rtype: ndarray(n,2) Return the standard deviation :math:`(\sigma_x, \sigma_y)` of the particle cloud at each time step. :seealso: :meth:`get_xyt` """ return np.array([h.std for h in self._history])
[docs] def plot_xy(self, block=False, **kwargs): r""" Plot estimated vehicle position :param args: position arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param kwargs: keywords arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param block: hold plot until figure is closed, defaults to False :type block: bool, optional Plot the estimated vehicle path in the xy-plane. :seealso: :meth:`get_xy` """ xyt = self.get_xyt() plt.plot(xyt[:, 0], xyt[:, 1], **kwargs)
# plt.show(block=block)