Source code for PGraph

from abc import ABC
import sys
import numpy as np
import matplotlib.pyplot as plt
import copy
from collections.abc import Iterable
import tempfile
import subprocess
import webbrowser

from spatialmath.base.graphics import axes_logic


[docs] class PGraph(ABC): def __init__(self, arg=None, metric=None, heuristic=None, verbose=False): # we use a list and a dict, the list respects the order of adding self._vertexlist = [] self._vertexdict = {} self._edgelist = set() self._verbose = verbose self._ncomponents = 0 self._connectivitychange = False if metric is None: self.metric = "L2" else: self.metric = metric if heuristic is None: self.heuristic = self.metric else: self.heuristic = heuristic def __str__(self): s = f"{self.__class__.__name__}: {self.n} {'vertex' if self.n==1 else 'vertices'}, {self.ne} edge{'s'[:self.ne^1]}, {self.nc} component{'s'[:self.nc^1]}" return s def __repr__(self): return str(self)
[docs] @classmethod def Dict(cls, d, reverse=False): """ Create graph from parent/child dictionary :param d: dictionary that maps from ``Vertex`` subclass to ``Vertex`` subclass :type d: dict :param reverse: reverse link direction, defaults to False :type reverse: bool, optional :return: graph :rtype: UGraph or DGraph Behaves like a constructor for a ``DGraph`` or ``UGraph`` from a dictionary that maps vertices to parents. From this information it can create a tree graph. By default parent vertices are linked their children. If ``reverse`` is True then children are linked to their parents. """ g = cls() for vertex, parent in d.items(): if isinstance(vertex, str): vertex_name = vertex else: vertex_name = vertex.name if vertex_name in g: vertex = g[vertex_name] else: vertex = g.add_vertex(UVertex(), name=vertex_name) if isinstance(parent, str): parent_name = parent else: parent_name = parent.name if parent_name in g: parent = g[parent_name] else: parent = g.add_vertex(UVertex(), name=parent_name) if reverse: g.add_edge(vertex, parent) else: g.add_edge(parent, vertex) return g
[docs] @classmethod def Adjacency(cls, A, coords=None, names=None): """ Create graph from adjacency matrix :param A: adjacency matrix :type A: ndarray(N,N) :param coords: coordinates of vertices, defaults to None :type coords: ndarray(N,M), optional :param names: names of vertices, defaults to None :type names: list(N) of str, optional :return: [description] :rtype: [type] Create a directed or undirected graph where non-zero elements ``A[i,j]`` correspond to edges from vertex ``i`` to vertex ``j``. .. warning:: For undirected graph ``A`` should be symmetric but this is not checked. Only the upper triangular part is used. """ if A.shape[0] != A.shape[1]: raise ValueError("Adjacency matrix must be square") if names is not None and len(names) != A.shape[0]: raise ValueError("length of names must match dimension of adjacency matrix") if coords is not None and coords.shape[0] != A.shape[0]: raise ValueError("coords must have same number of rows as adjacency matrix") g = cls() name = None coord = None for i in range(A.shape[0]): if names is not None: name = names[i] if coords is not None: coord = coords[i, :] g.add_vertex(name=name, coord=coord) if isinstance(g, UGraph): # undirected graph for i in range(A.shape[0]): for j in range(i + 1, A.shape[1]): if A[i, j] > 0: g[i].connect(g[j], cost=A[i, j]) else: # directed graph for i in range(A.shape[0]): for j in range(A.shape[1]): if A[i, j] > 0: if i == j: raise ValueError("loops in graph not supported") g[i].connect(g[j], cost=A[i, j]) return g
[docs] def copy(self): """ Deepcopy of graph :param g: A graph :type g: PGraph :return: deep copy :rtype: PGraph """ return copy.deepcopy(self)
[docs] def add_vertex(self, vertex, name=None): """ Add a vertex to the graph (superclass method) :param vertex: vertex to add :type vertex: Vertex subclass :param name: name of vertex :type name: str ``G.add_vertex(v)`` add vertex ``v`` to the graph ``G``. If the vertex has no name and ``name`` is None give it a default name ``#N`` where ``N`` is a consecutive integer. The vertex is placed into a dictionary with a key equal to its name. """ if name is None: name = vertex.name if name is None: name = f"#{len(self._vertexlist)}" vertex.name = name self._vertexlist.append(vertex) self._vertexdict[vertex.name] = vertex if self._verbose: print(f"New vertex {vertex.name}: {vertex.coord}") vertex._graph = self self._connectivitychange = True return vertex
[docs] def add_edge(self, v1, v2, **kwargs): """ Add an edge to the graph (superclass method) :param v1: first vertex (start if a directed graph) :type v1: Vertex subclass :param v2: second vertex (end if a directed graph) :type v2: Vertex subclass :param kwargs: optional arguments to pass to ``Vertex.connect`` :return: edge :rtype: Edge Create an edge between a vertex pair and adds it to the graph. This is a graph centric way of creating an edge. The alternative is the ``connect`` method of a vertex. :seealso: :meth:`Edge.connect` :meth:`Vertex.connect` """ if isinstance(v1, str): v1 = self[v1] elif not isinstance(v1, Vertex): raise TypeError("v1 must be Vertex subclass or string name") if isinstance(v2, str): v2 = self[v2] elif not isinstance(v2, Vertex): raise TypeError("v2 must be Vertex subclass or string name") if self._verbose: print(f"New edge from {v1.name} to {v2.name}") return v1.connect(v2, **kwargs)
[docs] def remove(self, x): """ Remove element from graph (superclass method) :param x: element to remove from graph :type x: Edge or Vertex subclass :raises TypeError: unknown type The edge or vertex is removed, and all references and lists are updated. .. warning:: The connectivity of the network may be changed. """ if isinstance(x, Edge): # remove an edge # remove edge from the edgelist of connected vertices x.v1._edgelist.remove(x) x.v2._edgelist.remove(x) # indicate that connectivity has changed x.v1._connectivitychange = True x.v2._connectivitychange = True self._connectivitychange = True # remove references to the vertices x.v1 = None x.v2 = None # remove from list of all edges self._edgelist.remove(x) elif isinstance(x, Vertex): # remove a vertex # remove all edges of this vertex for edge in copy.copy(x._edgelist): self.remove(edge) # remove from list and dict of all edges self._vertexlist.remove(x) del self._vertexdict[x.name] else: raise TypeError("expecting Edge or Vertex")
[docs] def show(self): print("vertices:") for v in self._vertexlist: print(" " + str(v)) print("edges:") for e in self._edgelist: print(" " + str(e))
@property def n(self): """ Number of vertices :return: Number of vertices :rtype: int """ return len(self._vertexdict) @property def ne(self): """ Number of edges :return: Number of vertices :rtype: int """ return len(self._edgelist) @property def nc(self): """ Number of components :return: Number of components :rtype: int .. note:: - Components are labeled from 0 to ``g.nc-1``. - A graph coloring algorithm is run if the graph connectivity has changed. .. note:: A lazy approach is used, and if a connectivity changing operation has been performed since the last call, the graph coloring algorithm is run which is potentially expensive for a large graph. """ self._graphcolor() return self._ncomponents def _metricfunc(self, metric): def L1(v): return np.linalg.norm(v, 1) def L2(v): return np.linalg.norm(v) def SE2(v): # wrap angle to range [-pi, pi) v[2] = (v[2] + np.pi) % (2 * np.pi) - np.pi return np.linalg.norm(v) if callable(metric): return metric elif isinstance(metric, str): if metric == "L1": return L1 elif metric == "L2": return L2 elif metric == "SE2": return SE2 else: raise ValueError("unknown metric") @property def metric(self): """ Get the distance metric for graph :return: distance metric :rtype: callable This is a function of a vector and returns a scalar. """ return self._metric @metric.setter def metric(self, metric): r""" Set the distance metric for graph :param metric: distance metric :type metric: callable or str This is a function of a vector and returns a scalar. It can be user defined function or a string: - 'L1' is the norm :math:`L_1 = \Sigma_i | v_i |` - 'L2' is the norm :math:`L_2 = \sqrt{ \Sigma_i v_i^2}` - 'SE2' is a mixed norm for vectors :math:`(x, y, \theta)` and is :math:`\sqrt{x^2 + y^2 + \bar{\theta}^2}` where :math:`\bar{\theta}` is :math:`\theta` wrapped to the interval :math:`[-\pi, \pi)` The metric is used by :meth:`closest` and :meth:`distance` """ self._metric = self._metricfunc(metric) @property def heuristic(self): """ Get the heuristic distance metric for graph :return: heuristic distance metric :rtype: callable This is a function of a vector and returns a scalar. """ return self._heuristic @heuristic.setter def heuristic(self, heuristic): r""" Set the heuristic distance metric for graph :param metric: heuristic distance metric :type metric: callable or str This is a function of a vector and returns a scalar. It can be user defined function or a string: - 'L1' is the norm :math:`L_1 = \Sigma_i | v_i |` - 'L2' is the norm :math:`L_2 = \sqrt{ \Sigma_i v_i^2}` - 'SE2' is a mixed norm for vectors :math:`(x, y, \theta)` and is :math:`\sqrt{x^2 + y^2 + \bar{\theta}^2}` where :math:`\bar{\theta}` is :math:`\theta` wrapped to the interval :math:`[-\pi, \pi)` The heuristic distance is only used by the A* planner :meth:`path_Astar`. """ self._heuristic = self._metricfunc(heuristic) def __repr__(self): s = [] for vertex in self: ss = f"{vertex.name} at {vertex.coord}" if vertex.label is not None: ss += " component={vertex.label}" s.append(ss) return "\n".join(s) def __getitem__(self, i): """ Get vertex (superclass method) :param i: vertex description :type i: int or str :return: the referenced vertex :rtype: Vertex subclass Retrieve a vertex by index or name: -``g[i]`` is the i'th vertex in the graph. This reflects the order of addition to the graph. -``g[s]`` is vertex named ``s`` -``g[v]`` is ``v`` where ``v`` is a ``Vertex`` subclass This method also supports iteration over the vertices in a graph:: for v in g: print(v) will iterate over all the vertices. """ if isinstance(i, int): return self._vertexlist[i] elif isinstance(i, str): return self._vertexdict[i] elif isinstance(i, Vertex): return i def __contains__(self, item): """ Test if vertex in graph :param item: vertex or name of vertex :type item: Vertex subclass or str :return: true if vertex exists in the graph :rtype: bool - ``'name' in graph`` is true if a vertex named ``'name'`` exists in the graph. - ``v in graph`` is true if the vertex reference ``v`` exists in the graph. """ if isinstance(item, str): return item in self._vertexdict elif isinstance(item, Vertex): return item in self._vertexdict.values()
[docs] def closest(self, coord): """ Vertex closest to point :param coord: coordinates of a point :type coord: ndarray(n) :return: closest vertex :rtype: Vertex subclass Returns the vertex closest to the given point. Distance is computed according to the graph's metric. :seealso: :meth:`metric` """ min_dist = np.inf min_which = None for vertex in self: d = self.metric(vertex.coord - coord) if d < min_dist: min_dist = d min_which = vertex return min_which, min_dist
[docs] def edges(self): """ Get all edges in graph (superclass method) :return: All edges in the graph :rtype: list of Edge references We can iterate over all edges in the graph by:: for e in g.edges(): print(e) .. note:: The ``edges()`` of a Vertex is a list of all edges connected to that vertex. :seealso: :meth:`Vertex.edges` """ return self._edgelist
[docs] def plot( self, colorcomponents=True, force2d=False, vopt={}, eopt={}, text={}, block=False, grid=True, ax=None, ): """ Plot the graph :param vopt: vertex format, defaults to 12pt o-marker :type vopt: dict, optional :param eopt: edge format, defaults to None :type eopt: dict, optional :param text: text label format, defaults to None :type text: False or dict, optional :param colorcomponents: color vertices and edges by component, defaults to None :type color: bool, optional :param block: block until figure is dismissed, defaults to True :type block: bool, optional The graph is plotted using matplotlib. If ``colorcomponents`` is True then each component is assigned a unique color. ``vertex`` and ``edge`` cannot include a color keyword. If ``text`` is a dict it is used to format text labels for the vertices which are the vertex names. If ``text`` is None default formatting is used. If ``text`` is False no labels are added. """ vopt = {**dict(marker="o", markersize=12), **vopt} eopt = {**dict(linewidth=3), **eopt} if colorcomponents: color = plt.cm.coolwarm(np.linspace(0, 1, self.nc)) if len(self[0].coord) == 2 or force2d: # 2D plotting if ax is None: ax = axes_logic(ax, 2) for c in range(self.nc): # for each component for vertex in self.component(c): if text is not False: ax.text(vertex.x, vertex.y, " " + vertex.name, **text) if colorcomponents: ax.plot(vertex.x, vertex.y, color=color[c, :], **vopt) for v in vertex.neighbours(): ax.plot( [vertex.x, v.x], [vertex.y, v.y], color=color[c, :], **eopt, ) else: ax.plot(vertex.x, vertex.y, **vopt) for v in vertex.neighbours(): ax.plot([vertex.x, v.x], [vertex.y, v.y], **eopt) else: # 3D or higher plotting, just do (x, y, z) if ax is None: ax = axes_logic(ax, 3) for c in range(self.nc): # for each component for vertex in self.component(c): if text is not False: ax.text( vertex.x, vertex.y, vertex.z, " " + vertex.name, **text ) if colorcomponents: ax.plot( vertex.x, vertex.y, vertex.z, **{**dict(color=color[c, :]), **vopt}, ) for v in vertex.neighbours(): ax.plot( [vertex.x, v.x], [vertex.y, v.y], [vertex.z, v.z], **{**dict(color=color[c, :]), **eopt}, ) else: ax.plot(vertex.x, vertex.y, **vopt) for v in vertex.neighbours(): ax.plot( [vertex.x, v.x], [vertex.y, v.y], [vertex.z, v.z], **eopt, ) # if nc > 1: # # add a colorbar # plt.colorbar() ax.grid(grid) plt.show(block=block)
[docs] def highlight_path(self, path, block=False, **kwargs): """ Highlight a path through the graph :param path: [description] :type path: [type] :param block: [description], defaults to True :type block: bool, optional The vertices and edges along the path are overwritten with a different size/width and color. :seealso: :meth:`highlight_vertex` :meth:`highlight_edge` """ for i in range(len(path)): if i < len(path) - 1: e = path[i].edgeto(path[i + 1]) self.highlight_edge(e, **kwargs) self.highlight_vertex(path[i], **kwargs) plt.show(block=block)
[docs] def highlight_edge(self, edge, scale=2, color="r", alpha=0.5): """ Highlight an edge in the graph :param edge: The edge to highlight :type edge: Edge subclass :param scale: Overwrite with a line this much bigger than the original, defaults to 1.5 :type scale: float, optional :param color: Overwrite with a line in this color, defaults to 'r' :type color: str, optional """ p1 = edge.v1 p2 = edge.v2 plt.plot( [p1.x, p2.x], [p1.y, p2.y], color=color, linewidth=3 * scale, alpha=alpha )
[docs] def highlight_vertex(self, vertex, scale=2, color="r", alpha=0.5): """ Highlight a vertex in the graph :param edge: The vertex to highlight :type edge: Vertex subclass :param scale: Overwrite with a line this much bigger than the original, defaults to 1.5 :type scale: float, optional :param color: Overwrite with a line in this color, defaults to 'r' :type color: str, optional """ if isinstance(vertex, Iterable): for n in vertex: if isinstance(n, str): n = self[n] plt.plot(n.x, n.y, "o", color=color, markersize=12 * scale, alpha=alpha) else: plt.plot( vertex.x, vertex.y, "o", color=color, markersize=12 * scale, alpha=alpha )
[docs] def dotfile(self, filename=None, direction=None): """ Export graph as a GraphViz dot file :param filename: filename to save graph to, defaults to None :type filename: str, optional ``g.dotfile()`` creates the specified file which contains the `GraphViz <https://graphviz.org>`_ code to represent the embedded graph. By default output is to the console. .. note:: - The graph is undirected if it is a subclass of ``UGraph`` - The graph is directed if it is a subclass of ``DGraph`` - Use ``neato`` rather than dot to get the embedded layout .. note:: If ``filename`` is a file object then the file will *not* be closed after the GraphViz model is written. :seealso: :func:`showgraph` """ if filename is None: f = sys.stdout elif isinstance(filename, str): f = open(filename, "w") else: f = filename if isinstance(self, DGraph): print("digraph {", file=f) else: print("graph {", file=f) if direction is not None: print(f"rankdir = {direction}", file=f) # add the vertices including name and position for vertex in self: if vertex.coord is None: print(' "{:s}"'.format(vertex.name), file=f) else: print( ' "{:s}" [pos="{:.5g},{:.5g}"]'.format( vertex.name, vertex.coord[0], vertex.coord[1] ), file=f, ) print(file=f) # add the edges for e in self.edges(): if isinstance(self, DGraph): print(' "{:s}" -> "{:s}"'.format(e.v1.name, e.v2.name), file=f) else: print(' "{:s}" -- "{:s}"'.format(e.v1.name, e.v2.name), file=f) print("}", file=f) if filename is None or isinstance(filename, str): f.close() # noqa
[docs] def showgraph(self, **kwargs): """ Display graph in a browser tab :param kwargs: arguments passed to :meth:`dotfile` ``g.showgraph()`` renders and displays the graph in a browser tab. The graph is exported in `GraphViz <https://graphviz.org>`_ format, rendered to PDF using ``dot`` and then displayed in a browser tab. :seealso: :func:`dotfile` """ # create the temporary dotfile dotfile = tempfile.TemporaryFile(mode="w") self.dotfile(dotfile, **kwargs) # rewind the dot file, create PDF file in the filesystem, run dot dotfile.seek(0) pdffile = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) result = subprocess.run("dot -Tpdf", shell=True, stdin=dotfile, stdout=pdffile) if result.returncode == 0: # dot ran happily # open the PDF file in browser (hopefully portable), then cleanup webbrowser.open(f"file://{pdffile.name}")
# time.sleep(1) # os.remove(pdffile.name)
[docs] def iscyclic(self): pass
[docs] def average_degree(self): r""" Average degree of the graph :return: average degree :rtype: float Average degree is :math:`2 E / N` for an undirected graph and :math:`E / N` for a directed graph where :math:`E` is the total number of edges and :math:`N` is the number of vertices. """ if isinstance(self, DGraph): return len(self.edges()) / self.n elif isinstance(self, UGraph): return 2 * len(self.edges()) / self.n
# --------------------------------------------------------------------------- # # MATRIX REPRESENTATIONS
[docs] def Laplacian(self): """ Laplacian matrix for the graph :return: Laplacian matrix :rtype: NumPy ndarray ``g.Laplacian()`` is the Laplacian matrix (NxN) of the graph where N is the number of vertices. .. note:: - Laplacian is always positive-semidefinite. - Laplacian has at least one zero eigenvalue. - The number of zero-valued eigenvalues is the number of connected components in the graph. :seealso: :meth:`adjacency` :meth:`incidence` :meth:`degree` """ return self.degree() - (self.adjacency() > 0)
[docs] def connectivity(self, vertices=None): """ Graph connectivity :return: a list with the number of edges per vertex :rtype: list The average vertex connectivity is:: mean(g.connectivity()) and the minimum vertex connectivity is:: min(g.connectivity()) """ c = [] if vertices is None: vertices = self for n in vertices: c.append(len(n._edgelist)) return c
[docs] def degree(self): """ Degree matrix of graph :return: degree matrix :rtype: ndarray(N,N) This is a diagonal matrix where element ``[i,i]`` is the number of edges connected to vertex id ``i``. :seealso: :meth:`adjacency` :meth:`incidence` :meth:`laplacian` """ return np.diag(self.connectivity())
[docs] def adjacency(self): """ Adjacency matrix of graph :returns: adjacency matrix :rtype: ndarray(N,N) The elements of the adjacency matrix ``[i,j]`` are 1 if vertex ``i`` is connected to vertex ``j``, else 0. .. note:: - vertices are numbered in their order of creation. A vertex index can be resolved to a vertex reference by ``graph[i]``. - for an undirected graph the matrix is symmetric - Eigenvalues of ``A`` are real and are known as the spectrum of the graph. - The element ``A[i,j]`` can be considered the number of walks of length one edge from vertex ``i`` to vertex ``j`` (either zero or one). - If ``Ak = A ** k`` the element ``Ak[i,j]`` is the number of walks of length ``k`` from vertex ``i`` to vertex ``j``. :seealso: :meth:`Laplacian` :meth:`incidence` :meth:`degree` """ # create a dict mapping vertex to an id vdict = {} for i, vert in enumerate(self): vdict[vert] = i A = np.zeros((self.n, self.n)) for vertex in self: for n in vertex.neighbours(): A[vdict[vertex], vdict[n]] = 1 return A
[docs] def incidence(self): """ Incidence matrix of graph :returns: incidence matrix :rtype: ndarray(n,ne) The elements of the incidence matrix ``I[i,j]`` are 1 if vertex ``i`` is connected to edge ``j``, else 0. .. note:: - vertices are numbered in their order of creation. A vertex index can be resolved to a vertex reference by ``graph[i]``. - edges are numbered in the order they appear in ``graph.edges()``. :seealso: :meth:`Laplacian` :meth:`adjacency` :meth:`degree` """ edges = self.edges() I = np.zeros((self.n, len(edges))) # create a dict mapping edge to an id edict = {} for i, edge in enumerate(edges): edict[edge] = i for i, vertex in enumerate(self): for i, e in enumerate(vertex.edges()): I[i, edict[e]] = 1 return I
[docs] def distance(self): """ Distance matrix of graph :return: distance matrix :rtype: ndarray(n,n) The elements of the distance matrix ``D[i,j]`` is the edge cost of moving from vertex ``i`` to vertex ``j``. It is zero if the vertices are not connected. """ # create a dict mapping vertex to an id vdict = {} for i, vert in enumerate(self): vdict[vert] = i A = np.zeros((self.n, self.n)) for v1 in self: for v2, edge in v1.incidences(): A[vdict[v1], vdict[v2]] = edge.cost return A
# GRAPH COMPONENTS
[docs] def component(self, c): """ All vertices in specified graph component ``graph.component(c)`` is a list of all vertices in graph component ``c``. """ self._graphcolor() # ensure labels are uptodate return [v for v in self if v.label == c]
[docs] def samecomponent(self, v1, v2): """ Test if vertices belong to same graph component :param v1: vertex :type v1: Vertex subclass :param v2: vertex :type v2: Vertex subclass :return: true if vertices belong to same graph component :rtype: bool Test whether vertices belong to the same component. For a: - directed graph this implies a path between them - undirected graph there is not necessarily a path between them """ self._graphcolor() # ensure labels are uptodate return v1.label == v2.label
# def remove(self, v): # # remove edges from neighbour's edge list # for e in v.edges(): # next = e.next(v) # next._edgelist.remove(e) # next._connectivitychange = True # # remove references from the graph # self._vertexlist.remove(v) # for key, value in self._vertexdict.items(): # if value is v: # del self._vertexdict[key] # break # v._edgelist = [] # remove all references to edges # --------------------------------------------------------------------------- #
[docs] def path_BFS(self, S, G, verbose=False, summary=False): """ Breadth-first search for path :param S: start vertex :type S: Vertex subclass :param G: goal vertex :type G: Vertex subclass :return: list of vertices from S to G inclusive, path length :rtype: list of Vertex subclass, float Returns a list of vertices that form a path from vertex ``S`` to vertex ``G`` if possible, otherwise return None. """ if isinstance(S, str): S = self[S] elif not isinstance(S, Vertex): raise TypeError("start must be Vertex subclass or string name") if isinstance(G, str): G = self[G] elif not isinstance(S, Vertex): raise TypeError("goal must be Vertex subclass or string name") # we use lists not sets since the order is instructive in verbose # mode, really need ordered sets... frontier = [S] explored = [] parent = {} done = False while frontier: if verbose: print() print("FRONTIER:", ", ".join([v.name for v in frontier])) print("EXPLORED:", ", ".join([v.name for v in explored])) x = frontier.pop(0) if verbose: print(" expand", x.name) # expand the vertex for n in x.neighbours(): if n is G: if verbose: print(" goal", n.name, "reached") parent[n] = x done = True break if n not in frontier and n not in explored: # add it to the frontier frontier.append(n) if verbose: print(" add", n.name, "to the frontier") parent[n] = x if done: break explored.append(x) if verbose: print(" move", x.name, "to the explored list") else: # no path return None # reconstruct the path from start to goal x = G path = [x] length = 0 while x is not S: p = parent[x] length += x.edgeto(p).cost path.insert(0, p) x = p if summary or verbose: print( f"{len(explored)} vertices explored, {len(frontier)} remaining on the frontier" ) return path, length
[docs] def path_UCS(self, S, G, verbose=False, summary=False): """ Uniform cost search for path :param S: start vertex :type S: Vertex subclass :param G: goal vertex :type G: Vertex subclass :return: list of vertices from S to G inclusive, path length, tree :rtype: list of Vertex subclass, float, dict Returns a list of vertices that form a path from vertex ``S`` to vertex ``G`` if possible, otherwise return None. The search tree is returned as dict that maps a vertex to its parent. The heuristic is the distance metric of the graph, which defaults to Euclidean distance. """ if isinstance(S, str): S = self[S] elif not isinstance(S, Vertex): raise TypeError("start must be Vertex subclass or string name") if isinstance(G, str): G = self[G] elif not isinstance(S, Vertex): raise TypeError("goal must be Vertex subclass or string name") frontier = [S] explored = [] parent = {} f = {S: 0} # evaluation function while frontier: if verbose: print() print( "FRONTIER:", ", ".join([f"{v.name}({f[v]:.0f})" for v in frontier]) ) print("EXPLORED:", ", ".join([v.name for v in explored])) i = np.argmin([f[n] for n in frontier]) # minimum f in frontier x = frontier.pop(i) if verbose: print(" expand", x.name) if x is G: break # expand the vertex for n, e in x.incidences(): fnew = f[x] + e.cost if n not in frontier and n not in explored: # add it to the frontier parent[n] = x f[n] = fnew frontier.append(n) if verbose: print(" add", n.name, "to the frontier") elif n in frontier: # neighbour is already in the frontier # cost of path via x is lower that previous, reparent it if fnew < f[n]: if verbose: print( f" reparent {n.name}: cost {fnew} via {x.name} is less than cost {f[n]} via {parent[n].name}, change parent from {parent[n].name} to {x.name} " ) f[n] = fnew parent[n] = x explored.append(x) if verbose: print(" move", x.name, "to the explored list") else: # no path return None # reconstruct the path from start to goal x = G path = [x] length = 0 while x is not S: p = parent[x] length += p.edgeto(x).cost path.insert(0, p) x = p parent_names = {} for v, p in parent.items(): parent_names[v.name] = p.name if summary or verbose: print( f"{len(explored)} vertices explored, {len(frontier)} remaining on the frontier" ) return path, length, parent_names
[docs] def path_Astar(self, S, G, verbose=False, summary=False): """ A* search for path :param S: start vertex :type S: Vertex subclass :param G: goal vertex :type G: Vertex subclass :return: list of vertices from S to G inclusive, path length, tree :rtype: list of Vertex subclass, float, dict Returns a list of vertices that form a path from vertex ``S`` to vertex ``G`` if possible, otherwise return None. The search tree is returned as dict that maps a vertex to its parent. The heuristic is the distance metric of the graph, which defaults to Euclidean distance. :seealso: :meth:`heuristic` """ if isinstance(S, str): S = self[S] elif not isinstance(S, Vertex): raise TypeError("start must be Vertex subclass or string name") if isinstance(G, str): G = self[G] elif not isinstance(S, Vertex): raise TypeError("goal must be Vertex subclass or string name") frontier = [S] explored = [] parent = {} g = {S: 0} # cost to come f = {S: 0} # evaluation function while frontier: if verbose: print() print( "FRONTIER:", ", ".join([f"{v.name}({f[v]:.0f})" for v in frontier]) ) print("EXPLORED:", ", ".join([v.name for v in explored])) i = np.argmin([f[n] for n in frontier]) # minimum f in frontier x = frontier.pop(i) if verbose: print(" expand", x.name) if x is G: break # expand the vertex for n, e in x.incidences(): if n not in frontier and n not in explored: # add it to the frontier frontier.append(n) parent[n] = x g[n] = g[x] + e.cost # update cost to come f[n] = g[n] + n.heuristic_distance(G) # heuristic if verbose: print(" add", n.name, "to the frontier") elif n in frontier: # neighbour is already in the frontier gnew = g[x] + e.cost if gnew < g[n]: # cost of path via x is lower that previous, reparent it if verbose: print( f" reparent {n.name}: cost {gnew} via {x.name} is less than cost {g[n]} via {parent[n].name}, change parent from {parent[n].name} to {x.name} " ) g[n] = gnew f[n] = g[n] + n.heuristic_distance(G) # heuristic parent[n] = x # reparent explored.append(x) if verbose: print(" move", x.name, "to the explored list") else: # no path return None # reconstruct the path from start to goal x = G path = [x] length = 0 while x is not S: p = parent[x] length += p.edgeto(x).cost path.insert(0, p) x = p parent_names = {} for v, p in parent.items(): parent_names[v.name] = p.name if summary or verbose: print( f"{len(explored)} vertices explored, {len(frontier)} remaining on the frontier" ) return path, length, parent_names
# -------------------------------------------------------------------------- #
[docs] class UGraph(PGraph): """ Class for undirected graphs .. inheritance-diagram:: UGraph """
[docs] def add_vertex(self, coord=None, name=None): """ Add vertex to undirected graph :param coord: coordinate for an embedded graph, defaults to None :type coord: array-like, optional :param name: vertex name, defaults to "#i" :type name: str, optional :return: new vertex :rtype: UVertex - ``g.add_vertex()`` creates a new vertex with optional ``coord`` and ``name``. - ``g.add_vertex(v)`` takes an instance or subclass of UVertex and adds it to the graph """ if isinstance(coord, UVertex): vertex = coord else: vertex = UVertex(coord) super().add_vertex(vertex, name=name) return vertex
[docs] @classmethod def vertex_copy(self, vertex): return DVertex(coord=vertex.coord, name=vertex.name)
def _graphcolor(self): """ Color the graph Performs a depth-first labeling operation, assigning the ``label`` attribute of every vertex with a sequential integer starting from 0. This method checks the ``_connectivitychange`` attribute of all vertices and if any are True it will perform the coloring operation. This flag is set True by any operation that adds or removes a vertex or edge. :seealso: :meth:`nc` """ if self._connectivitychange or any([n._connectivitychange for n in self]): # color the graph # clear all the labels for vertex in self: vertex.label = None vertex._connectivitychange = False lastlabel = None for label in range(self.n): assignment = False for v in self: # find first vertex with no label if v.label is None: # do BFS q = [v] # initialize frontier while len(q) > 0: v = q.pop() # expand v v.label = label for n in v.neighbours(): if n.label is None: q.append(n) lastlabel = label assignment = True break if not assignment: break self._ncomponents = lastlabel + 1
[docs] class DGraph(PGraph): """ Class for directed graphs .. inheritance-diagram:: DGraph """
[docs] def add_vertex(self, coord=None, name=None): """ Add vertex to directed graph :param coord: coordinate for an embedded graph, defaults to None :type coord: array-like, optional :param name: vertex name, defaults to "#i" :type name: str, optional :return: new vertex :rtype: DVertex - ``g.add_vertex()`` creates a new vertex with optional ``coord`` and ``name``. - ``g.add_vertex(v)`` takes an instance or subclass of DVertex and adds it to the graph """ if isinstance(coord, Vertex): vertex = coord else: vertex = DVertex(coord=coord, name=name) super().add_vertex(vertex, name=name) return vertex
[docs] @classmethod def vertex_copy(self, vertex): return DVertex(coord=vertex.coord, name=vertex.name)
def _graphcolor(self): """ Color the graph Performs a depth-first labeling operation, assigning the ``label`` attribute of every vertex with a sequential integer starting from 0. This method checks the ``_connectivitychange`` attribute of all vertices and if any are True it will perform the coloring operation. This flag is set True by any operation that adds or removes a vertex or edge. :seealso: :meth:`nc` """ if self._connectivitychange or any([n._connectivitychange for n in self]): # color the graph # clear all the labels for vertex in self: vertex.label = None vertex._connectivitychange = False # initial labeling pass merge = {} nextlabel = 1 for v in self: if v.label is None: # no label, try to inherit one from a neighbour for n in v.neighbours(): if n.label is not None: # neighbour has a label v.label = n.label break if v.label is None: # still not labeled, assign a new label v.label = nextlabel nextlabel += 1 # now look for clashes for n in v.neighbours(): if n.label is None: # neighbour has no label, give it this one n.label = v.label elif v.label != n.label: # label clash, note it for merging merge[n.label] = v.label # merge labels and find unique labels unique = set() for v in self: while v.label in merge: v.label = merge[v.label] unique.add(v.label) final = {u: i for i, u in enumerate(unique)} for v in self: v.label = final[v.label] self._ncomponents = len(unique)
# ========================================================================== #
[docs] class Edge: """ Edge class Is used to represent directed directed and undirected edges. Each edge has: - ``cost`` cost of traversing this edge, required for planning methods - ``data`` reference to arbitrary data associated with the edge - ``v1`` first vertex, start vertex for a directed edge - ``v2`` second vertex, end vertex for a directed edge .. note:: - An undirected graph is created by having a single edge object in the edgelist of _each_ vertex. - This class can be inherited to provide user objects with graph capability. - Inheritance is an alternative to providing arbitrary user data. An Edge points to a pair of vertices. At ``connect`` time the vertices get references back to the Edge object. ``graph.add_edge(v1, v2)`` calls ``v1.connect(v2)`` """
[docs] def __init__(self, v1=None, v2=None, cost=None, data=None): """ Create an edge object :param v1: start of the edge, defaults to None :type v1: Vertex subclass, optional :param v2: end of the edge, defaults to None :type v2: Vertex subclass, optional :param cost: edge cost, defaults to None :type cost: any, optional :param data: edge data, defaults to None :type data: any, optional Creates an edge but does not connect it to the vertices or add it to the graph. If vertices are given, and have associated coordinates, the edge cost will be computed according to the distance measure associated with the graph. ``data`` is a way of associating any object with the edge, its value can be found as the ``.data`` attribute of the edge. An alternative approach is to subclass the ``Edge`` class. .. note:: To compute edge cost from the vertices, the vertices must have been added to the graph. :seealso: :meth:`Edge.connect` :meth:`Vertex.connect` """ self.v1 = v1 self.v2 = v2 self.data = data # try to compute edge cost as metric distance if not given if cost is not None: self.cost = cost elif not (v1 is None or v1.coord is None or v2 is None or v2.coord is None): self.cost = v1._graph.metric(v1.coord - v2.coord) else: self.cost = None
def __repr__(self): return str(self) def __str__(self): s = f"Edge{{{self.v1} -- {self.v2}, cost={self.cost:.4g}}}" if self.data is not None: s += f" data={self.data}" return s
[docs] def connect(self, v1, v2): """ Add edge to the graph :param v1: start of the edge :type v1: Vertex subclass :param v2: end of the edge :type v2: Vertex subclass The edge is added to the graph and connects vertices ``v1`` and ``v2``. .. note:: The vertices must already be added to the graph. """ if v1._graph is None: raise ValueError("vertex v1 does not belong to a graph") if v2._graph is None: raise ValueError("vertex v2 does not belong to a graph") if not v1._graph is v2._graph: raise ValueError("vertices must belong to the same graph") # connect edge to its vertices self.v1 = v1 self.v2 = v2 # tell the vertices to add edge to their edgelists as appropriate for # DGraph or UGraph v1.connect(v2, edge=self)
[docs] def next(self, vertex): """ Return other end of an edge :param vertex: one vertex on the edge :type vertex: Vertex subclass :raises ValueError: ``vertex`` is not on the edge :return: the other vertex on the edge :rtype: Vertex subclass ``e.next(v1)`` is the vertex at the other end of edge ``e``, ie. the vertex that is not ``v1``. """ if self.v1 is vertex: return self.v2 elif self.v2 is vertex: return self.v1 else: raise ValueError("shouldnt happen")
[docs] def vertices(self): raise DeprecationWarning("use endpoints instead")
@property def endpoints(self): return [self.v1, self.v2]
# def remove(self): # """ # Remove edge from graph # ``e.remove()`` removes ``e`` from the graph, but does not delete the # edge object. # """ # # remove this edge from the edge list of both end vertices # if self in self.v1._edgelist: # self.v1._edgelist.remove(self) # if self in self.v2._edgelist: # self.v2._edgelist.remove(self) # # indicate that connectivity has changed # self.v1._connectivitychange = True # self.v2._connectivitychange = True # # remove references to the vertices # self.v1 = None # self.v2 = None # ========================================================================== #
[docs] class Vertex: """ Superclass for vertices of directed and non-directed graphs. Each vertex has: - ``name`` - ``label`` an int indicating which graph component contains it - ``_edgelist`` a list of edge objects that connect this vertex to others - ``coord`` the coordinate in an embedded graph (optional) """ def __init__(self, coord=None, name=None): self._edgelist = [] if coord is None: self.coord = None else: self.coord = np.r_[coord] self.name = name self.label = None self._connectivitychange = True self._edgelist = [] self._graph = None # reference to owning graph # print('Vertex init', type(self)) def __str__(self): return f"[{self.name:s}]" def __repr__(self): if self.coord is None: coord = "?" else: coord = ", ".join([f"{x:.4g}" for x in self.coord]) return f"{self.__class__.__name__}[{self.name:s}, coord=({coord})]"
[docs] def copy(self, cls=None): if cls is not None: return cls.vertex_copy(self) else: return self.__class__(coord=self.coord, name=self.name)
[docs] def neighbours(self): """ Neighbours of a vertex ``v.neighbours()`` is a list of neighbours of this vertex. .. note:: For a directed graph the neighbours are those on edges leaving this vertex """ return [e.next(self) for e in self._edgelist]
[docs] def neighbors(self): """ Neighbors of a vertex ``v.neighbors()`` is a list of neighbors of this vertex. .. note:: For a directed graph the neighbours are those on edges leaving this vertex """ return [e.next(self) for e in self._edgelist]
[docs] def isneighbour(self, vertex): """ Test if vertex is a neigbour :param vertex: vertex reference :type vertex: Vertex subclass :return: true if a neighbour :rtype: bool For a directed graph this is true only if the edge is from ``self`` to ``vertex``. """ return vertex in [e.next(self) for e in self._edgelist]
[docs] def incidences(self): """ Neighbours and edges of a vertex ``v.incidences()`` is a generator that returns a list of incidences, tuples of (vertex, edge) for all neighbours of the vertex ``v``. .. note:: For a directed graph the edges are those leaving this vertex """ return [(e.next(self), e) for e in self._edgelist]
[docs] def connect(self, dest, edge=None, cost=None, data=None): """ Connect two vertices with an edge :param dest: The vertex to connect to :type dest: ``Vertex`` subclass :param edge: Use this as the edge object, otherwise a new ``Edge`` object is created from the vertices being connected, and the ``cost`` and ``edge`` parameters, defaults to None :type edge: ``Edge`` subclass, optional :param cost: the cost to traverse this edge, defaults to None :type cost: float, optional :param data: reference to arbitrary data associated with the edge, defaults to None :type data: Any, optional :raises TypeError: vertex types are different subclasses :return: the edge connecting the vertices :rtype: Edge ``v1.connect(v2)`` connects vertex ``v1`` to vertex ``v2``. .. note:: - If the vertices subclass ``UVertex`` the edge is undirected, and if they subclass ``DVertex`` the edge is directed. - Vertices must both be of the same ``Vertex`` subclass :seealso: :meth:`Edge` """ if not dest.__class__.__bases__[0] is self.__class__.__bases__[0]: raise TypeError("must connect vertices of same type") elif isinstance(edge, Edge): e = edge else: e = Edge(self, dest, cost=cost, data=data) self._graph._edgelist.add(e) self._graph._connectivitychange = True self._connectivitychange = True return e
[docs] def edgeto(self, dest): """ Get edge connecting vertex to specific neighbour :param dest: a neigbouring vertex :type dest: ``Vertex`` subclass :raises ValueError: ``dest`` is not a neighbour :return: the edge from this vertex to ``dest`` :rtype: Edge .. note:: - For a directed graph ``dest`` must be at the arrow end of the edge """ for n, e in self.incidences(): if n is dest: return e raise ValueError("dest is not a neighbour")
[docs] def edges(self): """ All outgoing edges of vertex :return: List of all edges leaving this vertex :rtype: list of Edge .. note:: - For a directed graph the edges are those leaving this vertex - For a non-directed graph the edges are those leaving or entering this vertex """ return self._edgelist
[docs] def heuristic_distance(self, v2): return self._graph.heuristic(self.coord - v2.coord)
[docs] def distance(self, coord): """ Distance from vertex to point :param coord: coordinates of the point :type coord: ndarray(n) or Vertex :return: distance :rtype: float Distance is computed according to the graph's metric. :seealso: :meth:`metric` """ if isinstance(coord, Vertex): coord = coord.coord return self._graph.metric(self.coord - coord)
@property def degree(self): """ Degree of vertex :return: degree of the vertex :rtype: int Returns the number of edges connected to the vertex. .. note:: For a ``DGraph`` only outgoing edges are considered. :seealso: :meth:`edges` """ return len(self.edges()) @property def x(self): """ The x-coordinate of an embedded vertex :return: The x-coordinate :rtype: float """ return self.coord[0] @property def y(self): """ The y-coordinate of an embedded vertex :return: The y-coordinate :rtype: float """ return self.coord[1] @property def z(self): """ The z-coordinate of an embedded vertex :return: The z-coordinate :rtype: float """ return self.coord[2]
[docs] def closest(self): return self._graph.closest(self.coord)
[docs] class UVertex(Vertex): """ Vertex subclass for undirected graphs This class can be inherited to provide user objects with graph capability. .. inheritance-diagram:: UVertex """
[docs] def connect(self, other, **kwargs): if isinstance(other, Vertex): e = super().connect(other, **kwargs) elif isinstance(other, Edge): e = super().connect(edge=other) else: raise TypeError("bad argument") # e = super().connect(other, **kwargs) self._edgelist.append(e) other._edgelist.append(e) self._graph._edgelist.add(e) return e
[docs] class DVertex(Vertex): """ Vertex subclass for directed graphs This class can be inherited to provide user objects with graph capability. .. inheritance-diagram:: DVertex """
[docs] def connect(self, other, **kwargs): if isinstance(other, Vertex): e = super().connect(other, **kwargs) elif isinstance(other, Edge): e = super().connect(edge=other) else: raise TypeError("bad argument") self._edgelist.append(e) return e
[docs] def remove(self): self._edgelist = None # remove all references to edges
if __name__ == "__main__": g = UGraph() print(g) for i in range(10): g.add_vertex() g.add_edge(g[0], g[1]) print(g)