Source code for

#!/usr/bin/env python3

Python EKF Planner
@Author: Peter Corke, original MATLAB code and Python version
@Author: Kristian Gibson, initial MATLAB port

Based on code by Paul Newman, Oxford University,

from collections import namedtuple

import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
from spatialmath import base

Monte-carlo based localisation for estimating vehicle pose based on
odometry and observations of known landmarks.

# TODO: refactor this and EKF, RNG, history, common plots, animation, movie

[docs]class ParticleFilter:
[docs] def __init__(self, robot, sensor, R, L, nparticles=500, seed=0, x0=None, verbose=False, animate=False, history=True, workspace=None): """ Particle filter :param robot: robot motion model :type robot: :class:`VehicleBase` subclass, :param sensor: vehicle mounted sensor model :type sensor: :class:`SensorBase` subclass :param R: covariance of the zero-mean Gaussian noise added to the particles at each step (diffusion) :type R: ndarray(3,3) :param L: covariance used in the sensor likelihood model :type L: ndarray(2,2) :param nparticles: number of particles, defaults to 500 :type nparticles: int, optional :param seed: random number seed, defaults to 0 :type seed: int, optional :param x0: initial state, defaults to [0, 0, 0] :type x0: array_like(3), optional :param verbose: display extra debug information, defaults to False :type verbose: bool, optional :param history: retain step-by-step history, defaults to True :type history: bool, optional :param workspace: dimension of workspace, see :func:`` :type workspace: scalar, array_like(2), array_like(4) This class implements a Monte-Carlo estimator or particle filter for vehicle state, based on odometry, a landmark map, and landmark observations. The state of each particle is a possible vehicle configuration :math:`(x,y,\theta)`. Bootstrap particle resampling is used. The working area is defined by ``workspace`` or inherited from the landmark map attached to the ``sensor`` (see :func:``): ============== ======= ======= ``workspace`` x-range y-range ============== ======= ======= A (scalar) -A:A -A:A [A, B] A:B A:B [A, B, C, D] A:B C:D ============== ======= ======= Particles are initially distributed uniform randomly over this area. Example:: V = np.diag([0.02, np.radians(0.5)]) ** 2 robot = Bicycle(covar=V, animation="car", workspace=10) robot.control = RandomPath(workspace=robot) map = LandmarkMap(nlandmarks=20, workspace=robot.workspace) W = np.diag([0.1, np.radians(1)]) ** 2 sensor = RangeBearingSensor(robot, map, covar=W, plot=True) R = np.diag([0.1, 0.1, np.radians(1)]) ** 2 L = np.diag([0.1, 0.1]) pf = ParticleFilter(robot, sensor, R, L, nparticles=1000) map.plot() robot.plot_xy() pf.plot_xy() plt.plot(pf.get_std()[:100,:]) .. note:: Set ``seed=0`` to get different behaviour from run to run. :seealso: :meth:`run` """ self._robot = robot self._sensor = sensor self.R = R self.L = L self.nparticles = nparticles self._animate = animate # self.dim = self._history = [] self.x = () self.weight = () self.w0 = 0.05 self._x0 = x0 # create a private random number stream if required self._random = np.random.default_rng(seed) self._seed = seed self._keep_history = history # keep history self._htuple = namedtuple("PFlog", "t odo xest std weights") if workspace is not None: self._dim = base.expand_dims(workspace) else: self._dim = self._workspace = self.robot.workspace self._init()
def __str__(self): #ParticleFilter.char Convert to string # # PF.char() is a string representing the state of the ParticleFilter # object in human-readable form. # # See also ParticleFilter.display. def indent(s, n=2): spaces = ' ' * n return s.replace('\n', '\n' + spaces) s = f"ParticleFilter object: {self.nparticles} particles" s += '\nR: ' + base.array2str(self.R) s += '\nL: ' + base.array2str(self.L) if self.robot is not None: s += indent("\nrobot: " + str(self.robot)) if self.sensor is not None: s += indent("\nsensor: " + str(self.sensor)) return s @property def robot(self): """ Get robot object :return: robot used in simulation :rtype: :class:`VehicleBase` subclass """ return self._robot @property def sensor(self): """ Get sensor object :return: sensor used in simulation :rtype: :class:`SensorBase` subclass """ return self._sensor @property def map(self): """ Get map object :return: map used in simulation :rtype: :class:`LandmarkMap` subclass """ return self._map @property def verbose(self): """ Get verbosity state :return: verbosity :rtype: bool """ return self._verbose @property def history(self): """ Get EKF simulation history :return: simulation history :rtype: list of namedtuples At each simulation timestep a namedtuple of is appended to the history list. It contains, for that time step, estimated state and covariance, and sensor observation. :seealso: :meth:`get_t` :meth:`get_xy` :meth:`get_std` :meth:`get_Pnorm` """ return self._history @property def workspace(self): """ Size of robot workspace :return: workspace bounds [xmin, xmax, ymin, ymax] :rtype: ndarray(4) Returns the bounds of the workspace as specified by constructor option ``workspace`` """ return self._workspace @property def random(self): """ Get private random number generator :return: NumPy random number generator :rtype: :class:`numpy.random.Generator` Has methods including: - ``integers(low, high, size, endpoint)`` - ``random(size)`` - ``uniform`` - ``normal(mean, std, size)`` - ``multivariate_normal(mean, covar, size)`` The generator is initialized with the seed provided at constructor time every time ``init`` is called. """ return self._random def _init(self, x0=None): #ParticleFilter.init Initialize the particle filter # # PF.init() initializes the particle distribution and clears the # history. # # Notes:: # - If initial particle states were given to the constructor the states are # set to this value, else a random distribution over the map is used. # - Invoked by the run() method. self.robot.init() self.sensor.init() #clear the history self._history = [] # create a new private random number generator if self._seed is not None: self._random = np.random.default_rng(self._seed) self._t = 0 # initialize particles if x0 is None: x0 = self._x0 if x0 is None: # create initial particle distribution as uniformly randomly distributed # over the map workspace and heading angles x = self.random.uniform(self.workspace[0], self.workspace[1], size=(self.nparticles,)) y = self.random.uniform(self.workspace[2], self.workspace[3], size=(self.nparticles,)) t = self.random.uniform(-np.pi, np.pi, size=(self.nparticles,)) self.x = np.c_[x, y, t] self.weight = np.ones((self.nparticles,))
[docs] def run(self, T=10, x0=None): """ Run the particle filter simulation :param T: maximum simulation time in seconds :type T: float :param animate: animate motion of vehicle, defaults to False :type animate: bool, optional :param movie: name of movie file to create, defaults to None :type movie: str, optional Simulates the motion of a vehicle (under the control of a driving agent) and the EKF estimator. The steps are: - initialize the filter, vehicle and vehicle driver agent, sensor - for each time step: - step the vehicle and its driver agent, obtain odometry - take a sensor reading - execute the EKF - save information as a namedtuple to the history list for later display :seealso: :meth:`history` :meth:`landmark` :meth:`landmarks` :meth:`get_xy` :meth:`get_t` :meth:`get_std` :meth:`plot_xy` """ self._init(x0=x0) # anim = Animate( # display the initial particles if self._animate: self.h, = plt.plot(self.x[:, 0], self.x[:, 1], 'go', zorder=0, markersize=3, markeredgecolor='none', alpha=0.3, label='particle') # set(self.h, 'Tag', 'particles') # self.robot.plot() # iterate over time for i in range(round(T / self.robot.dt)): self._step()
# anim.add() # anim.close() def _step(self): #fprintf('---- step\n') odo = self.robot.step() # move the robot # update the particles based on odometry self._predict(odo) # get a sensor reading z, lm_id = self.sensor.reading() if z is not None: self._observe(z, lm_id) #fprintf(' observe beacon #d\n', lm_id) self._select() # our estimate is simply the mean of the particles x_est = self.x.mean(axis=0) std_est = self.x.std(axis=0) # std is more complex for angles, need to account for 2pi wrap std_est[2] = np.sqrt(np.sum(base.angdiff(self.x[:,2], x_est[2]) ** 2)) / (self.nparticles-1) # display the updated particles # set(self.h, 'Xdata', self.x(:,1), 'Ydata', self.x(:,2), 'Zdata', self.x(:,3)) if self._animate: self.h.set_xdata(self.x[:, 0]) self.h.set_ydata(self.x[:, 1]) # if ~isempty(self.anim) # self.anim.add() if self._keep_history: hist = self._htuple( self.robot._t, odo.copy(), x_est, std_est, self.weight.copy() ) self._history.append(hist)
[docs] def plot_pdf(self): """ Plot particle PDF Displays a discrete PDF of vehicle position. Creates a 3D plot where the x- and y-axes are the estimated vehicle position and the z-axis is the particle weight. Each particle is represented by a a vertical line segment of height equal to particle weight. """ ax = base.plotvol3() for (x, y, t), weight in zip(self.x, self.weight): # ax.plot([x, x], [y, y], [0, weight], 'r') ax.plot([x, x], [y, y], [0, weight], 'skyblue', linewidth=3) ax.plot(x, y, weight, 'k.', markersize=6) plt.grid(True) plt.xlabel('X') plt.ylabel('Y') plt.xlim() ax.set_zlabel('particle weight') ax.view_init(29, 59)
def _predict(self, odo): # step 2 # update the particle state based on odometry and a random perturbation # Straightforward code: # # for i=1:self.nparticles # x = self.robot.f( self.x(i,:), odo)' + sqrt(self.R)*self.randn[2,0] # x[2] = angdiff(x[2]) # self.x(i,:) = x # # Vectorized code: self.x = self.robot.f(self.x, odo) + \ self.random.multivariate_normal((0, 0, 0), self.R, size=self.nparticles) self.x[:, 2] = base.angdiff(self.x[:, 2]) def _observe(self, z, lm_id): # step 3 # predict observation and score the particles # Straightforward code: # # for p = 1:self.nparticles # # what do we expect observation to be for this particle? # # use the sensor model h(.) # z_pred = self.sensor.h( self.x(p,:), lm_id) # # # how different is it # innov[0] = z[0] - z_pred[0] # innov[1] = angdiff(z[1], z_pred[1]) # # # get likelihood (new importance). Assume Gaussian but any PDF works! # # If predicted obs is very different from actual obs this score will be low # # ie. this particle is not very good at predicting the observation. # # A lower score means it is less likely to be selected for the next generation... # # The weight is never zero. # self.weight(p) = exp(-0.5*innov'*inv(self.L)*innov) + 0.05 # end # # Vectorized code: invL = np.linalg.inv(self.L) z_pred = self.sensor.h(self.x, lm_id) z_pred[:, 0] = z[0] - z_pred[:, 0] z_pred[:, 1] = base.angdiff(z[1], z_pred[:, 1]) LL = -0.5 * np.r_[invL[0,0], invL[1,1], 2*invL[0,1]] e = np.c_[z_pred[:, 0]**2, z_pred[:, 1]**2, z_pred[:,0] * z_pred[:, 1]] @ LL self.weight = np.exp(e) + self.w0 def _select(self): # step 4 # select particles based on their weights # # particles with large weights will occupy a greater percentage of the # y axis in a cummulative plot cdf = np.cumsum(self.weight) / self.weight.sum() # so randomly (uniform) choosing y values is more likely to correspond to # better particles... iselect = self.random.uniform(0, 1, size=(self.nparticles,)) # find the particle that corresponds to each y value (just a look up) interpfun = sp.interpolate.interp1d(cdf, np.arange(self.nparticles), assume_sorted=True, kind='nearest', fill_value='extrapolate') inextgen = interpfun(iselect).astype( # copy selected particles for next generation.. self.x = self.x[inextgen, :]
[docs] def get_t(self): """ Get time from simulation :return: simulation time vector :rtype: ndarray(n) Return simulation time vector, starts at zero. The timestep is an attribute of the ``robot`` object. """ return np.array([h.t for h in self._history])
[docs] def get_xyt(self): r""" Get estimated vehicle trajectory :return: vehicle trajectory where each row is configuration :math:`(x, y, \theta)` :rtype: ndarray(n,3) :seealso: :meth:`plot_xy` :meth:`run` :meth:`history` """ return np.array([h.xest[:2] for h in self._history])
[docs] def get_std(self): r""" Get standard deviation of particles :return: standard deviation of vehicle position estimate :rtype: ndarray(n,2) Return the standard deviation :math:`(\sigma_x, \sigma_y)` of the particle cloud at each time step. :seealso: :meth:`get_xyt` """ return np.array([h.std for h in self._history])
[docs] def plot_xy(self, block=False, **kwargs): r""" Plot estimated vehicle position :param args: position arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param kwargs: keywords arguments passed to :meth:`~matplotlib.axes.Axes.plot` :param block: hold plot until figure is closed, defaults to False :type block: bool, optional Plot the estimated vehicle path in the xy-plane. :seealso: :meth:`get_xy` """ xyt = self.get_xyt() plt.plot(xyt[:, 0], xyt[:, 1], **kwargs)