"""
See Wikipedia article (https://en.wikipedia.org/wiki/D*)
"""
import math
from sys import maxsize
from collections import namedtuple
from enum import IntEnum, auto
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.cm as cm
from roboticstoolbox.mobile.PlannerBase import PlannerBase
from roboticstoolbox.mobile.OccGrid import BinaryOccupancyGrid, OccupancyGrid
import heapq
import bisect
import math
show_animation = True
# ======================================================================== #
# The following code is modified from Python Robotics
# https://github.com/AtsushiSakai/PythonRobotics/tree/master/PathPlanning
# D* grid planning
# Author: Nirnay Roy
# Copyright (c) 2016 - 2022 Atsushi Sakai and other contributors: https://github.com/AtsushiSakai/PythonRobotics/contributors
# Released under the MIT license: https://github.com/AtsushiSakai/PythonRobotics/blob/master/LICENSE
# for performance reasons there are some important differences compared to
# the version from Python Robotics:
#
# 1. replace the classic D* functions min__State(), get_kmin(), insert(), remove()
# with heapq.heappush() and heapq.heappop(). The open_list is now a list of
# tuples (k, _State) maintained by heapq, rather than a set
#
# 2. use enums rather than strings for cell state
#
# 3. lots of unnecessary inserting/deleting from the open_list due to arithmetic
# rounding in tests for costs h and k:
# - replace equality tests with math.isclose() which is faster than np.isclose()
# - add an offset to inequality tests, X > Y becomes X > Y + tol
class _Tag(IntEnum):
NEW = auto()
OPEN = auto()
CLOSED = auto()
class _State:
def __init__(self, x, y):
self.x = x
self.y = y
self.parent = None # 'back pointer' to next _State
self.t = _Tag.NEW # open closed new
self.h = 0 # cost to goal
self.k = 0 # estimate of shortest path cost
def __str__(self):
return f"({self.x}, {self.y})" # [{self.h:.1f}, {self.k:.1f}]"
def __repr__(self):
return self.__str__()
def __lt__(self, other):
return True
class _Map:
def __init__(self, row, col):
self.row = row
self.col = col
self._Map = self.init__Map()
def init__Map(self):
_Map_list = [] # list of rows
for i in range(self.row):
# for each row, make a list
tmp = []
for j in range(self.col):
tmp.append(_State(j, i)) # append column to the row
_Map_list.append(tmp) # append row to _Map
return _Map_list
_neighbours = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
def get_neighbors(self, _State):
_State_list = []
for i, j in self._neighbours:
try:
if _State.x + i >= 0 and _State.y + j >= 0:
_State_list.append(self._Map[_State.y + j][_State.x + i])
except IndexError:
pass
return _State_list
_root2 = np.sqrt(2)
def cost(self, _State1, _State2):
c = (
self.costmap[_State1.y, _State1.x] + self.costmap[_State2.y, _State2.x]
) / 2
dx = _State1.x - _State2.x
dy = _State1.y - _State2.y
if dx == 0 or dy == 0:
# NSEW movement, distance of 1
return c
else:
# diagonal movement, distance of sqrt(2)
return c * self._root2
# return c * math.sqrt(()**2 +
# ()**2)
def show_h(self):
h = np.empty((self.col, self.row))
for x in range(self.col):
for y in range(self.row):
h[y, x] = self._Map[y][x].h
print(h)
class _Dstar:
def __init__(self, _Map, tol=1e-6):
self._Map = _Map
# self.open_list = set()
self.open_list = []
self.nexpand = 0
self.tol = tol
def process__State(self, verbose=False):
if verbose:
print(
"FRONTIER ", " ".join([f"({x[1].x}, {x[1].y})" for x in self.open_list])
)
# get _State from frontier
if len(self.open_list) == 0:
if verbose:
print(" x is None ")
return -1
k_old, x = heapq.heappop(self.open_list)
x.t = _Tag.CLOSED
self.nexpand += 1
if verbose:
print(f"EXPAND {x}, {k_old:.1f}")
if x.h > k_old + self.tol:
# RAISE _State
if verbose:
print(" raise")
for y in self._Map.get_neighbors(x):
if (
y.t is not _Tag.NEW
and y.h <= k_old - self.tol
and x.h > y.h + self._Map.cost(x, y) + self.tol
):
if verbose:
print(
f" {x} cost from {x.h:.1f} to {y.h + self._Map.cost(x, y):.1f}; parent from {x.parent} to {y}"
)
x.parent = y
x.h = y.h + self._Map.cost(x, y)
if math.isclose(x.h, k_old, rel_tol=0, abs_tol=self.tol):
# normal _State
if verbose:
print(" normal")
for y in self._Map.get_neighbors(x):
if (
y.t is _Tag.NEW
or (
y.parent == x
and not math.isclose(
y.h, x.h + self._Map.cost(x, y), rel_tol=0, abs_tol=self.tol
)
)
or (y.parent != x and y.h > x.h + self._Map.cost(x, y) + self.tol)
):
if verbose:
print(f" reparent {y} from {y.parent} to {x}")
y.parent = x
self.insert(y, x.h + self._Map.cost(x, y))
else:
# RAISE or LOWER _State
if verbose:
print(" raise/lower")
for y in self._Map.get_neighbors(x):
if y.t is _Tag.NEW or (
y.parent == x
and not math.isclose(
y.h, x.h + self._Map.cost(x, y), rel_tol=0, abs_tol=self.tol
)
):
if verbose:
print(
f" {y} cost from {y.h:.1f} to {y.h + self._Map.cost(x, y):.1f}; parent from {y.parent} to {x}; add to frontier"
)
y.parent = x
self.insert(y, x.h + self._Map.cost(x, y))
else:
if (
y.parent != x
and y.h > x.h + self._Map.cost(x, y) + self.tol
and x.t is _Tag.CLOSED
):
self.insert(x, x.h)
if verbose:
print(f" {x}, {x.h:.1f} add to frontier")
else:
if (
y.parent != x
and x.h > y.h + self._Map.cost(y, x) + self.tol
and y.t is _Tag.CLOSED
and y.h > k_old + self.tol
):
self.insert(y, y.h)
if verbose:
print(f" {y}, {y.h:.1f} add to frontier")
if verbose:
print()
if len(self.open_list) == 0:
return -1
else:
return self.open_list[0][0]
ninsert = 0
nin = 0
def insert(self, _State, h_new):
self.ninsert += 1
if _State.t is _Tag.NEW:
_State.k = h_new
elif _State.t is _Tag.OPEN:
k_new = min(_State.k, h_new)
if _State.k == k_new:
# k hasn't changed, and vertex already in frontier
# just update h and be done
_State.h = h_new
return
else:
# k has changed, we need to remove the vertex from the list
# and re-insert it
_State.k = k_new
# scan the list to find vertex, then remove it
# this is quite slow
for i, item in enumerate(self.open_list):
if item[1] is _State:
del self.open_list[i]
break
# _State.k = min(_State.k, h_new)
# # remove the item from the open list
# # print('node already in open list, remove it first')
# #TODO use bisect on old _State.k to find the entry
# for i, item in enumerate(self.open_list):
# if item[1] is _State:
# del self.open_list[i]
# break
elif _State.t is _Tag.CLOSED:
_State.k = min(_State.h, h_new)
_State.h = h_new
_State.t = _Tag.OPEN
# self.open_list.add(_State)
heapq.heappush(self.open_list, (_State.k, _State))
def modify_cost(self, x, newcost):
self._Map.costmap[x.y, x.x] = newcost
if x.t is _Tag.CLOSED:
self.insert(x, x.parent.h + self._Map.cost(x, x.parent))
if len(self.open_list) == 0:
return -1
else:
# lowest priority item is always at index 0 according to docs
return self.open_list[0][0]
def showparents(self):
for y in range(self._Map.row - 1, -1, -1):
if y == self._Map.row - 1:
print(" ", end="")
for x in range(self._Map.col):
print(f" {x} ", end="")
print()
print(f"{y}: ", end="")
for x in range(self._Map.col):
x = self._Map._Map[y][x]
par = x.parent
if par is None:
print(" G ", end="")
else:
print(f"({par.x},{par.y}) ", end="")
print()
print()
# ====================== RTB wrapper ============================= #
# Copyright (c) 2022 Peter Corke: https://github.com/petercorke/robotics-toolbox-python
# Released under the MIT license: https://github.com/AtsushiSakai/PythonRobotics/blob/master/LICENSE
[docs]class DstarPlanner(PlannerBase):
r"""
D* path planner
:param costmap: traversability costmap
:type costmap: OccGrid or ndarray(w,h)
:param kwargs: common planner options, see :class:`PlannerBase`
================== ========================
Feature Capability
================== ========================
Plan :math:`\mathbb{R}^2`, discrete
Obstacle avoidance Yes, occupancy grid
Curvature Discontinuous
Motion Omnidirectional
================== ========================
Creates a planner that finds the minimum-cost path between two points in the
plane using omnidirectional motion. The path comprises a set of 8-way
connected points in adjacent cells.
The map is described by a 2D ``costmap`` whose elements indicate the cost
of traversing that cell. The cost of diagonal traverse is :math:`\sqrt{2}`
the value of the cell. An infinite cost indicates an untraversable cell
or obstacle.
Example:
.. runblock:: pycon
>>> from roboticstoolbox import DstarPlanner
>>> import numpy as np
>>> costmap = np.ones((6, 6));
>>> costmap[2:5, 3:5] = 10
>>> ds = DstarPlanner(costmap, goal=(1, 1));
>>> ds.plan()
>>> path, status = ds.query(start=(5, 4))
>>> print(path.T)
>>> print(status)
:thanks: based on D* grid planning included from `Python Robotics <https://github.com/AtsushiSakai/PythonRobotics/tree/master/PathPlanning>`_
:seealso: :class:`PlannerBase`
"""
def __init__(self, costmap=None, **kwargs):
super().__init__(ndims=2, **kwargs)
if isinstance(costmap, np.ndarray):
self.costmap = costmap
elif isinstance(costmap, OccupancyGrid):
self.costmap = costmap.grid
elif self.occgrid is not None:
self.costmap = np.where(self.occgrid.grid > 0, np.inf, 1)
else:
raise ValueError("unknown type of map")
self._Map = _Map(self.costmap.shape[0], self.costmap.shape[1])
self._Map.costmap = self.costmap
self._Dstar = _Dstar(self._Map) # , tol=0)
[docs] def plan(self, goal=None, animate=False, progress=True, summary=False):
r"""
Plan D* path
:param goal: goal position :math:`(x, y)`, defaults to previously set value
:type goal: array_like(2), optional
:param animate: animate the planning algorithm iterations, defaults to False
:type animate: bool, optional
:param progress: show progress bar, defaults to True
:type progress: bool, optional
Compute the minimum-cost obstacle-free distance to the goal from all
points in the grid.
"""
if goal is not None:
self.goal = goal
if self._goal is None:
raise ValueError("No goal specified here or in constructor")
self._goal = self._goal.astype(int)
goal_State = self._Map._Map[self._goal[1]][self._goal[0]]
self.goal_State = goal_State
# self._Dstar.open_list.add(goal_State)
self._Dstar.insert(goal_State, 0)
while True:
ret = self._Dstar.process__State()
# print('plan', ret, len(self._Dstar.open_list))
if ret == -1:
break
if summary:
print(self._Dstar.ninsert, self._Dstar.nin)
@property
def nexpand(self):
"""
Number of node expansions
:return: number of expansions
:rtype: int
This number will increase during initial planning, and also if
replanning is invoked during the :meth:`query`.
"""
return self._Dstar.nexpand
[docs] def query(self, start, sensor=None, animate=False, verbose=False):
"""
Find path with replanning
:param start: start position :math:`(x,y)`
:type start: array_like(2)
:param sensor: sensor function, defaults to None
:type sensor: callable, optional
:param animate: animate the motion of the robot, defaults to False
:type animate: bool, optional
:param verbose: display detailed diagnostic information about D* operations, defaults to False
:type verbose: bool, optional
:return: path from start to goal, one point :math:`(x, y)` per row
:rtype: ndarray(N,2)
If ``sensor`` is None then the plan determined by the ``plan`` phase
is used unaltered.
If ``sensor`` is not None it must be callable, and is called at each
step of the path with the current robot coordintes:
sensor((x, y))
and mimics the behaviour of a simple sensor onboard the robot which can
dynamically change the costmap. The function return a list (0 or more)
of 3-tuples (x, y, newcost) which are the coordinates of cells and their
cost. If the cost has changed this will trigger D* incremental
replanning. In this case the value returned by :meth:`nexpand` will
increase, according to the severity of the replanning.
:seealso: :meth:`plan`
"""
self.start = start
start_State = self._Map._Map[start[1]][start[0]]
s = start_State
s = s.parent
tmp = start_State
if sensor is not None and not callable(sensor):
raise ValueError("sensor must be callable")
cost = tmp.h
self.goal_State.h = 0
path = []
while True:
path.append((tmp.x, tmp.y))
if tmp == self.goal_State:
break
# x, y = tmp.parent.x, tmp.parent.y
if sensor is not None:
changes = sensor((tmp.x, tmp.y))
if changes is not None:
# make changes to the plan
for x, y, newcost in changes:
X = self._Dstar._Map._Map[y][x]
# print(f"change cost at ({x}, {y}) to {newcost}")
val = self._Dstar.modify_cost(X, newcost)
# propagate the changes to plan
print("propagate")
# self._Dstar.showparents()
while val != -1 and val < tmp.h:
val = self._Dstar.process__State(verbose=verbose)
# print('open set', len(self._Dstar.open_list))
# self._Dstar.showparents()
tmp = tmp.parent
# print('move to ', tmp)
status = namedtuple(
"_DstarStatus",
[
"cost",
],
)
return np.array(path), status(cost)
# # Just feed self._h into plot function from parent as p
# def next(self, current):
# if not self._valid_plan:
# Error("Cost _Map has changed, replan")
# # x = sub2ind(np.shape(self._costmap), current[1], current[0])
# # x = self._b[x]
# i = np.ravel_multi_index([current[1], current[0]], self._costmap.shape)
# i = self._b[i]
# if i == 0:
# return None # we have arrived
# else:
# x = np.unravel_index((i), self._costmap.shape)
# return x[1], x[0]
if __name__ == "__main__":
og = np.zeros((10, 10))
og[4:8, 3:6] = 1
print(og)
ds = DstarPlanner(occgrid=og)
print(ds.costmap)
start = (1, 1)
goal = (7, 6)
ds.plan(goal=goal)
ds._Map.show_h()
# path, status = ds.query(start=start)
# print(path)
# print(status)
# ds.plot(path=path)
def sensorfunc(pos):
if pos == (3, 3):
changes = []
for x in range(3, 6):
for y in range(0, 4):
changes.append((x, y, 100))
return changes
path2, status2 = ds.query(start=start, sensor=sensorfunc, verbose=False)
print(ds._Map.costmap)
ds._Map.show_h()
# ds._Dstar.replan()
# path2, status = ds.query(start=start)
print(path2)
print(status2)
ds.plot(path=path2)
# ds.plot(path=path2, block=True)
# obstacle zone
# m.set_cost([30, 60, 20, 60], np.inf)
# start = [10, 10]
# goal = [70, 70]
# if show_animation:
# m.plot()
# plt.plot(start[0], start[1], "og")
# plt.plot(goal[0], goal[1], "xb")
# plt.axis("equal")
# start = m._Map[start[0]][start[1]]
# end = m._Map[goal[0]][goal[1]]
# _Dstar = _Dstar(m)
# rx, ry = _Dstar.run(start, end)
# if show_animation:
# plt.plot(rx, ry, "-r")
# plt.show(block=True)
# # costly zone
# m.set_cost([30, 40, 60, 80], 1.70, modify=_Dstar)
# _Dstar.replan()
plt.show(block=True)