Source code for sbp_env.planners.prmPlanner

from typing import List

import networkx as nx
import numpy as np
from overrides import overrides
import tqdm

from ..utils.common import Node
from ..planners.rrtPlanner import RRTPlanner
from ..samplers import prmSampler
from ..utils import planner_registry
from ..utils.common import Colour, Stats

volume_of_unit_ball = {
    1: 2,
    2: 3.142,
    3: 4.189,
    4: 4.935,
    5: 5.264,
    6: 5.168,
    7: 4.725,
    8: 4.059,
    9: 3.299,
    10: 2.550,
}


def nearest_neighbours(
    nodes: List[Node], poses: np.ndarray, pos: np.ndarray, radius: float
):
    """A helper function to find the nearest neighbours from a roadmap

    :param nodes: the list of nodes to search against
    :param poses: array of positions
    :param pos: the position of interest
    :param radius: the maximum radius of distance

    """
    distances = np.linalg.norm(poses[: len(nodes)] - pos, axis=1)
    neighbours = []
    for i, d in enumerate(distances):
        if d < radius:
            neighbours.append(nodes[i])
    return neighbours


[docs]class PRMPlanner(RRTPlanner): """ Probabilistic Roadmap motion planner, the multi-query sampling-based planner. """
[docs] @overrides def init(self, *argv, **kwargs): super().init(*argv, **kwargs) self.d_threshold = self.args.epsilon self.gamma = ( 1 + np.power(2, self.args.engine.get_dimension()) * (1 + 1.0 / self.args.engine.get_dimension()) * 10000 ) self.graph = nx.DiGraph() self.graph.add_node(self.args.env.start_pt) self.args.env.end_state = None
[docs] @overrides def run_once(self): rand_pos, _, _ = self.args.sampler.get_valid_next_pos() Stats.get_instance().add_free() self.add_newnode(Node(rand_pos))
[docs] def clear_graph(self): """Clear the current roadmap graph""" self.graph = nx.DiGraph() self.graph.add_node(self.args.env.start_pt) self.args.env.end_state = None
[docs] def build_graph(self): """Performs the graph building process where .. math:: G = (V, E). """ n = len(self.nodes) radius = self.gamma * np.power( np.log(n + 1) / (n + 1), 1 / self.args.engine.get_dimension() ) for v in tqdm.tqdm(self.nodes, desc="Building graph"): m_near = nearest_neighbours(self.nodes, self.poses, v.pos, radius) for m_g in m_near: if m_g is v: continue # check if path between(m_g,m_new) defined by motion-model is collision free if not self.args.engine.cc.visible(m_g.pos, v.pos): continue self.graph.add_weighted_edges_from( [(m_g, v, self.args.engine.dist(m_g.pos, v.pos))] )
[docs] def get_nearest_free(self, node: Node, neighbours: List[Node]): """Internal method to get the closest existing node that is free to connects to the given node. :param node: the node of interest :param neighbours: the list of nodes to search against """ nn = None min_cost = 999999 for n in neighbours: if n is self.args.env.start_pt or n is self.args.env.goal_pt or n is node: continue if not self.args.engine.cc.visible(node.pos, n.pos): continue if nn is None: nn = n min_cost = self.args.engine.dist(node.pos, n.pos) else: _cost = self.args.engine.dist(node.pos, n.pos) if _cost < min_cost: min_cost = _cost nn = n return nn
[docs] def get_solution(self): """Build the solution path""" # get two nodes that is cloest to start/goal and are free routes m_near = nearest_neighbours( self.nodes, self.poses, self.args.sampler.start_pos, self.args.epsilon ) start = self.get_nearest_free(self.args.env.start_pt, m_near) m_near = nearest_neighbours( self.nodes, self.poses, self.args.sampler.goal_pos, self.args.epsilon ) goal = self.get_nearest_free(self.args.env.goal_pt, m_near) if start is None or goal is None or not nx.has_path(self.graph, start, goal): return float("inf") solution_path = nx.shortest_path(self.graph, start, goal) solution_path[0].cost = self.args.engine.dist(solution_path[0].pos, start.pos) for i in range(1, len(solution_path)): solution_path[i].parent = solution_path[i - 1] solution_path[i].cost = solution_path[i - 1].cost + self.args.engine.dist( solution_path[i].pos, solution_path[i - 1].pos ) self.c_max = goal.cost self.args.env.goal_pt.parent = goal start.parent = self.args.env.start_pt self.visualiser.draw_solution_path() return self.c_max
def pygame_prm_planner_paint(planner): """Visualisation function to paint for planner :param planner: the planner to visualise """ for n in planner.nodes: planner.args.env.draw_circle( pos=n.pos, colour=(0, 0, 255), radius=1.4, layer=planner.args.env.path_layers, ) def pygame_prm_planner_paint_when_terminate(planner): """Visualisation function to paint for planner when termiante :param planner: the planner to visualise """ planner.build_graph() # draw all edges for n1, n2 in planner.graph.edges(): planner.args.env.draw_path(n1, n2, Colour.path_blue) planner.get_solution() planner.args.env.update_screen() input("\nPress Enter to quit...") def klampt_prm_paint(planner) -> None: """Visualiser paint function for PRM :param planner: the planner to be visualised """ colour = (1, 0, 0, 1) for n in planner.nodes: planner.args.env.draw_node( planner.args.engine.cc.get_eef_world_pos(n.pos), colour=colour ) for edge in planner.graph.edges: edge = np.array(edge).transpose() planner.args.env.draw_path( planner.args.engine.cc.get_eef_world_pos(n.pos), planner.args.engine.cc.get_eef_world_pos(n.parent.pos), colour=colour, ) def klampt_prm_planner_paint_when_terminate(planner): """Visualisation function to paint for planner when termiante :param planner: the planner to visualise """ planner.build_graph() # draw all edges for n1, n2 in planner.tree.edges(): planner.args.env.draw_path(n1, n2, Colour.path_blue) planner.get_solution() planner.args.env.update_screen() input("\nPress Enter to quit...") # start register planner_registry.register_planner( "prm", planner_class=PRMPlanner, visualise_pygame_paint=pygame_prm_planner_paint, visualise_pygame_paint_terminate=pygame_prm_planner_paint_when_terminate, visualise_klampt_paint=klampt_prm_paint, visualise_klampt_paint_terminate=klampt_prm_planner_paint_when_terminate, sampler_id=prmSampler.sampler_id, ) # finish register