Improved bidirectional BIT path planning algorithm, bi-BIT python reproduces bidirectional Batch Informed Tree

Not much to say, let’s go directly to the code and reproduce it in pure python code.

#!/usr/bin/env python3
#coding:utf-8


import copy
import math
import platform
import random
import time
import numpy as np
import queue

import matplotlib.pyplot as plt

show_animation = True

def _dis(x1,x2):
    return np.linalg.norm(np.array(x1)-np.array(x2))
    

def radius(q):
    # zeta = math.pi**(self.dimension/2)/GAMMA_N[self.dimension-1]
    # lamb = math.pi**(self.dimension/2)/GAMMA_N[self.dimension-1]*cMax/2*(math.sqrt(cMax**2-self.cMin**2)/2)* *(self.dimension-1)
    # return self.eita*(math.log(q)/q*lamb/zeta)**(1/self.dimension)
    return 30.0 * math.sqrt((math.log(q) / q))

#map
class Map:
    def __init__(self,dim=2,obs_num=20,obs_size_max=2.5,xinit=[0,0],xgoal=[23,23],randMax=[30,30],randMin=[-5,-5] ):
        self.dimension = dim
        self.xinit = xinit
        self.xgoal = xgoal
        self.randMax = randMax
        self.randMin = randMin
        self.obstacles = []
        self.DISCRETE = 0.05

        # obstacles
        for i in range(obs_num):
            #TODO
            ob = []
            for j in range(dim):
                ob.append(random.random()*20 + 1.5)
            ob.append(random.random()*obs_size_max + 0.2)
            self.obstacles.append(ob)

        #informed
        self.cMin = _dis(self.xinit,self.xgoal)
        self.xCenter = (np.array(xinit) + np.array(xgoal))/2
        a1 = np.transpose([(np.array(xgoal)-np.array(xinit))/self.cMin])
        # first column of identity matrix transposed
        id1_t = np.array([1.0] + [0.0,]*(self.dimension-1)).reshape(1,self.dimension)
        M = a1@id1_t
        U, S, Vh = np.linalg.svd(M, 1, 1)
        self.C = np.dot(np.dot(U,
            np.diag([1.0,]*(self.dimension-1) + [np.linalg.det(U) * np.linalg.det(np.transpose(Vh))]))
            , Vh)

    def collision(self,x):
        for ob in self.obstacles:
            if _dis(x,ob[:-1])<=ob[-1]:
                return True
        return False

    def collisionLine(self,x1,x2):
        dis = _dis(x1,x2)
        if dis<self.DISCRETE:
            return False
        nums = int(dis/self.DISCRETE)
        direction = (np.array(x2)-np.array(x1))/_dis(x1,x2)
        for i in range(nums + 1):
            x = np.add(x1, i*self.DISCRETE*direction)
            if self.collision(x): return True
        if self.collision(x2): return True
        return False


    def randomSample(self):
        x = []
        for j in range(self.dimension):
            x.append(random.random()*(self.randMax[j]-self.randMin[j]) + self.randMin[j])
        return x
    def freeSample(self):
        x = self.randomSample()
        while self.collision(x):
            x = self.randomSample()
        return x
    def informedSample(self,cMax):
        L = np.diag([cMax/2] + [math.sqrt(cMax**2-self.cMin**2)/2,]*(self.dimension-1))
        cl = np.dot(self.C,L)
        x = np.dot(cl,self.ballSample()) + self.xCenter
        while self.collision(x):
            x = np.dot(cl,self.ballSample()) + self.xCenter
        return list(x)
    def ballSample(self):
        ret = []
        for i in range(self.dimension):
            ret.append(random.random()*2-1)
        ret = np.array(ret)
        return ret/np.linalg.norm(ret)*random.random()

    
    def drawMap(self):
        if self.dimension==2:
            plt.clf()
            sysstr = platform.system()
            if(sysstr == "Windows"):
                scale=18
            elif(sysstr == "Linux"):
                scale=24
            else: scale = 24
            for (ox, oy, size) in self.obstacles:
                plt.plot(ox, oy, "ok", ms=scale * size)
            
            plt.plot(self.xinit[0],self.xinit[1], "xr")
            plt.plot(self.xgoal[0],self.xgoal[1], "xr")
            plt.axis([self.randMin[0]-2,self.randMax[0] + 2, self.randMin[1]-2,self.randMax[1] + 2])
            plt.grid(True)

### main algorithm

## main Class
class BiBITstar(object):
    def __init__(self,_map,maxIter =200, bn=5,connFactor=0.0):
        self.map = _map
        self.batchSize = bn
        self.maxIter = maxIter
        self.bestCost = float('inf')
        self.bestConn = None

        self.x = [_map.xinit,_map.xgoal] # store all the point(samples, vertices)
        self.r = float('inf')

        self.qe = queue.PriorityQueue() # ecost,[vtind, xind]
        self.qv = queue.PriorityQueue() # the index in Tree
        self.vold = [] # the index in Tree
        self.Tree = [0,1]
        # self.Gtree = [0]
        # self.Htree = [1]
        self.X_sample = []
        # because python always copy a vertex for me ... :(
        self.isGtree = [True,False] # accroding to the order in Tree
        self.cost = [0,0] # accroding to the order in Tree
        self.parent = [None,None] # accroding to the order in tree
        self.children = [[],[]] # accroding to the order in Tree
        self.depth = [0,0] # accroding to the order in Tree
        self.conn = {}

        self.pruneNum = 0

        self.qv.put((self.distance(0,1),0))
        self.qv.put((self.distance(0,1),1))

        # if show_animation:
        # self.map.drawMap()
    
    def qeAdd(self,vt,x):
        self.qe.put((self.edgeQueueValue(vt,x),[vt,x]))

    def bestInQv(self):
        vc,vmt = self.qv.get()
        self.qv.put((vc,vmt))
        return (vc,vmt)
    def bestInQe(self):
        ec,[wmt,xm] = self.qe.get()
        self.qe.put((ec,[wmt,xm]))
        return (ec,[wmt,xm])

    def getCost(self,ind):
        try:
            tind = self.Tree.index(ind)
            cost = self.cost[tind]
            return cost
        except:
            return float('inf')

    def vertexQueueValue(self,vt):
        if self.bestCost < float('inf'):
            return self.cost[vt] + self.costTjHeuristicVertex(vt)
        v = self.Tree[vt]
        vn,dis = self.nearest(v,not self.isGtree[vt])
        return dis
        
    def edgeQueueValue(self,wt,x):
        if self.bestCost < float('inf'):
            return self.cost[wt] + self.distance(self.Tree[wt],x) + self.costTgHeuristic(x,self.isGtree[wt])
        vn,dis = self.nearest(x,self.isGtree[wt]) # !!problem: can't update during a batch
        return self.distance(self.Tree[wt],x) + dis
        
    def lowerBoundHeuristicEdge(self,vt,x):
        return self.costFgHeuristic(self.Tree[vt],not self.isGtree[vt]) + \
                    self.costFgHeuristic(x, self.isGtree[vt]) + \
                        self.distance(self.Tree[vt],x)
    def lowerBoundHeuristicVertex(self,x):
        x = self.Tree[x]
        return self.costFgHeuristic(x,True) + self.costFgHeuristic(x,False)
    def lowerBoundHeuristic(self,x):
        return self.costFgHeuristic(x,True) + self.costFgHeuristic(x,False)
    def costFgHeuristic(self,x,h=False):
        if h: target = 1
        else: target = 0
        return self.distance(target,x)

    def costTgHeuristic(self,ind,h=False):
        # if h:
        # Vnearest = self.nearest(ind,False)
        #else:
        # Vnearest = self.nearest(ind,True)
        Vnearest,nearDis = self.nearest(ind,not h)
        return self.cost[Vnearest] + nearDis

    def costTjHeuristicVertex(self,vt,i=False):
        if i:
            return self.cost(vt)
        else:
            return self.costTgHeuristic(self.Tree[vt],self.isGtree[vt])
    def costTjVertex(self,vertex,i=False):
        if i:
            return self.getCost(vertex)
        else:
            try:
                vcon = self.conn[vertex]
                return self.cost[vcon] + self.distance(vertex,vcon)
            except:
                return float('inf')

    """return treeIndx"""
    def nearest(self,indx, inGtree):
        nearestDis = float('inf')
        vn = None
        for vt in range(len(self.Tree)):
            if(self.isGtree[vt] == inGtree):
                dis = self.distance(self.Tree[vt],indx)
                if dis < nearestDis:
                    vn = vt
                    nearestDis = dis
        return vn,nearestDis

    def distance(self,ind1,ind2):
        return np.linalg.norm(np.array(self.x[ind1]) - np.array(self.x[ind2]))

    def solve(self):
        for iterateNum in range(self.maxIter):
            print("iter: ",iterateNum)
            if show_animation:
                self.drawGraph()
            if self.isEmpty():
                print("newBatch")
                self.newBatch()
            else:
                ec,[wmt,xm] = self.qe.get()
                wm = self.Tree[wmt]
                if self.lowerBoundHeuristicEdge(wmt,xm) > self.bestCost:
                    # end it.
                    while not self.qe.empty():
                        self.qe.get()
                    while not self.qv.empty():
                        vc,vt = self.qv.get()
                        self.vold.append(vt)
                    continue
                
                ## ExpandEdge
                if self.collisionEdge(wm,xm):
                    continue
                # it's a simple demo, we don't care too much about time-cost
                # if there's no collision, we add this edge.
                trueEdgeCost = self.distance(wm,xm)
                try:
                    xmt = self.Tree.index(xm)
                    isG = self.isGtree[xmt]
                    # same tree
                    ## delay rewire?
                    if isG == self.isGtree[wmt]:
                        if self.cost[wmt] + trueEdgeCost >= self.cost[xmt]:
                            continue
                        oldparent = self.parent[xmt]
                        self.children[oldparent].remove(xmt)
                        self.parent[xmt] = wmt
                        self.children[wmt].append(xmt)
                        self.cost[xmt] = self.cost[wmt] + trueEdgeCost # has not update the children TODO
                        self.depth[xmt] = self.depth[wmt] + 1

                    # another tree
                    else:
                        try:
                            wcont = self.conn[wmt]
                            if self.cost[wcont] + self.distance(wm,self.Tree[wcont]) <= \
                                self.cost[xmt] + self.distance(wm,xm):
                                continue
                            self.conn.pop(wcont)
                        except:
                            pass
                        try:
                            xcont = self.conn[xmt]
                            if self.cost[xcont] + self.distance(xm,self.Tree[xcont]) <= \
                                self.cost[wmt] + self.distance(xm,wm):
                                continue
                            self.conn.pop(xcont)
                        except:
                            pass
                        # update or create one
                        self.conn[wmt] = xmt
                        self.conn[xmt] = wmt
                        newCost = self.cost[wmt] + self.cost[xmt] + self.distance(wm,xm)
                        if newCost < self.bestCost:
                            self.bestCost = newCost
                            # report?
                            self.bestConn = [wmt,xmt]
                            if self.bestCost == self.map.cMin:
                                break
                # v->sample
                except:
                    xmt = len(self.Tree)
                    self.Tree.append(xm)
                    self.isGtree.append(self.isGtree[wmt])
                    self.parent.append(wmt)
                    self.children[wmt].append(xmt)
                    self.children.append([])
                    self.cost.append(self.cost[wmt] + trueEdgeCost)
                    self.depth.append(self.depth[wmt] + 1)
                    self.X_sample.remove(xm)
                    self.qv.put((self.vertexQueueValue(xmt),xmt))

        if self.bestCost == float('inf'):
            print("plan failed")
        else:
            if show_animation:
                path = self.getPath()
                plt.plot([self.x[ind][0] for ind in path], [self.x[ind][1] for ind in path], '-o')
                plt.show()
            print("plan finished with cost: ",self.bestCost)
        # print plan information
        gnum = 0
        for v in range(len(self.Tree)):
            if self.isGtree[v]:
                gnum + = 1
        print("Plan Info:")
        print("total samples:",len(self.x),"Gtree:",gnum,"Htree:",len(self.Tree)-gnum)
        print("edge num:",len(self.parent),"pruned:",self.pruneNum,"(sample:",len(self.x)-len(self.X_sample)-len(self.Tree) ,")")
        ## TODO more informations?

    ## ---
    # while BestQueueValue(Qv) <= BestQueueValue(Qe):
    #ExpandVertex(BestValueIn(Qv))
    def isEmpty(self):
        while not self.qv.empty():
            if self.qe.empty():
                self.expandVertex()
            else:
                vcost,vmt = self.bestInQv()
                ecost,[wmt,xm] = self.bestInQe()
                if(ecost>=vcost):
                    self.expandVertex()
                else:
                    break
        while self.qe.empty() and not self.qv.empty():
            self.expandVertex()

        return self.qe.empty()

    # f_hat(v,x) < bestCost
    def edgeInsertConditionSample(self,vt,xind):
        return self.lowerBoundHeuristicEdge(vt,xind) < self.bestCost
    # f_hat(v,x) < bestCost AND (better solution)
    # Ti_hat(v) + c(v,x) < Ti(x) (optimal tree)
    def edgeInsertConditionSameTree(self,vt,ivt):
        if self.parent[vt] == ivt:
            return False
        if self.parent[ivt] == vt:
            return False
        v = self.Tree[vt]
        iv = self.Tree[ivt]
        costTargetHeuristic = self.costFgHeuristic(v,not self.isGtree[vt]) + \
                                self.distance(v,iv)
        return costTargetHeuristic < self.cost[ivt] and \
                self.costFgHeuristic(iv, self.isGtree[vt]) + \
                    costTargetHeuristic < self.bestCost
    def edgeInsertConditionAnotherTree(self,vt,jvt):
        v = self.Tree[vt]
        jv = self.Tree[jvt]
        cvx = self.distance(v,jv)
        if self.costFgHeuristic(v,not self.isGtree[vt]) + \
                self.costFgHeuristic(jv, self.isGtree[vt]) + \
                    cvx >= self.bestCost:
            return False
        # if is better than current connEdge
        try:
            vcont = self.conn[vt]
            if vcont == jvt or self.cost[jvt] + cvx > self.cost[vcont] + self.distance(self.Tree[vcont],v):
                return False
        except:
            pass
        try:
            jcont = self.conn[jvt]
            if jcont == vt or self.cost[vt] + cvx > self.cost[jcont] + self.distance(self.Tree[jcont],jv):
                return False
        except:
            pass
        return True

    def expandVertex(self):
        (vcost,vt) = self.qv.get()
        self.vold.append(vt)
        if self.lowerBoundHeuristicVertex(vt) > self.bestCost:
            while not self.qv.empty():
                vc,vt = self.qv.get()
                self.vold.append(vt)
        else:
            ## expand vertex
            # expand to free sample
            v = self.Tree[vt]
            xnearby = self.nearby(v,self.X_sample)
            for xind in xnearby:
                if self.edgeInsertConditionSample(vt,xind):
                    self.qeAdd(vt,xind)

            ## expand to tree
            # expand to the same tree
            # delay rewire?
            if self.bestCost < float('inf'):
                inear = self.nearbyT(v,self.isGtree[vt])
                for ivt in inear:
                    if self.edgeInsertConditionSameTree(vt,ivt):
                        self.qeAdd(vt,self.Tree[ivt])
            # expand to another tree
            jnear = self.nearbyT(v,not self.isGtree[vt])
            for jvt in jnear:
                if self.edgeInsertConditionAnotherTree(vt,jvt):
                    # TODO if there's no solution, should we give some reward?
                    self.qeAdd(vt,self.Tree[jvt])

    """
    return nearby(self.r) x in thelist
    """
    def nearby(self,vind,thelist):
        near = []
        for ind in thelist: # too violent... try next neighbor...
            if self.distance(ind,vind) < self.r:
                near.append(ind)
        return near
    def nearbyT(self,vind,isG):
        near = []
        for ti in range(len(self.Tree)):
            if self.isGtree[ti] == isG:
                if self.distance(vind, self.Tree[ti]) < self.r:
                    near.append(ti)
        return near

    def sample(self,c):
        if c == float('inf'):
            for i in range(self.batchSize):
                self.X_sample.append(len(self.x))
                self.x.append(self.map.freeSample())
        else:
            for i in range(self.batchSize):
                self.X_sample.append(len(self.x))
                self.x.append(self.map.informedSample(c))
    def collisionEdge(self,vind,xind):
        return self.map.collisionLine(self.x[vind],self.x[xind])

    def newBatch(self):
        while not self.qv.empty():
            vc,vt = self.qv.get()
            self.vold.append(vt)
        while not self.qe.empty():
            self.qe.get()
        self.updateCost()
        self.prune()
        self.sample(self.bestCost)
        self.r = radius(len(self.x))
        while len(self.vold) > 0:
            vt = self.vold.pop()
            self.qv.put((self.vertexQueueValue(vt),vt))
    
    # update the cost of vertex (might be out-of-date because of rewire)
    def updateCost(self,prune = False):
        waitingToUpdate = queue.Queue()
        for cd in self.children[0]:
            waitingToUpdate.put(cd)
        for cd in self.children[1]:
            waitingToUpdate.put(cd)
        while not waitingToUpdate.empty():
            curV = waitingToUpdate.get()
            self.cost[curV] = self.cost[self.parent[curV]] + self.distance(self.Tree[curV],self.Tree[self.parent[curV]])
            for cd in self.children[curV]:
                waitingToUpdate.put(cd)
        if self.bestCost < float('inf'):
            self.bestCost = self.cost[self.bestConn[0]] + self.cost[self.bestConn[1]]\
                 + self.distance(self.Tree[self.bestConn[0]],self.Tree[self.bestConn[1]])


    def prune(self):
        # if prune ...
        if self.bestCost < float('inf'):
            # self.updateCost(prune=True)
            for x in self.X_sample:
                if self.lowerBoundHeuristic(x) > self.bestCost:
                    self.X_sample.remove(x)
                    self.pruneNum + = 1
            pruneVertices = []
            for vt in range(len(self.Tree)):
                if self.Tree[vt] == None:
                    continue
                if self.lowerBoundHeuristicVertex(vt) > self.bestCost:
                    self.deleteVertex(vt,pruneVertices)
            self.pruneNum + = len(pruneVertices)
            pruneVertices.sort(reverse=True)
            for vtp in pruneVertices:
                try:
                    vtcon = self.conn[vtp]
                    self.conn.pop(vtcon)
                    self.conn.pop(vtp)
                except:
                    pass
                self.vold.remove(vtp)
                self.children[vtp] = None # if children have children?
                self.Tree[vtp] = None
                self.isGtree[vtp] = None
                self.cost[vtp] = None
                self.parent[vtp] = None
                self.depth[vtp] = None
            #pruneVertices.sort()
            # for i in range(len(self.vold)):
            # for prv in pruneVertices:
            # if self.vold[i] > prv:
            # self.vold[i] -= 1
            # else: break
                    
    def deleteVertex(self,vt,pruneVertices):
        while len(self.children[vt]):
            print("waring/debug: prune a vertex which has children")
            cdt = self.children[vt][-1]
            self.deleteVertex(cdt,pruneVertices)
            if self.Tree[cdt] != None and self.lowerBoundHeuristicVertex(cdt) < self.bestCost:
                self.X_sample.append(self.Tree[cdt])
        # # mark as pruned
        pruneVertices.append(vt)
        self.Tree[vt] = None
        pt = self.parent[vt]
        self.children[pt].remove(vt)
        

    def getPath(self):
        reversePath = []
        if self.isGtree[self.bestConn[0]]:
            vg = self.bestConn[0]
            vh = self.bestConn[1]
        else:
            vg = self.bestConn[1]
            vh = self.bestConn[0]
        curV = vg
        if vg != 0:
            while self.parent[curV] != 0:
                reversePath.append(self.Tree[curV])
                curV = self.parent[curV]
            reversePath.append(self.Tree[curV])
        
        #reverse
        path = [0]
        while len(reversePath)>0:
            path.append(reversePath.pop())

        curV = vh
        if vh != 1:
            while self.parent[curV] != 1:
                path.append(self.Tree[curV])
                curV = self.parent[curV]
            path.append(self.Tree[curV])
        path.append(1)

        return path

    def drawGraph(self):
        plt.clf()
        if self.map.dimension == 2:
            self.map.drawMap()
            for xind in self.X_sample:
                plt.plot(self.x[xind][0],self.x[xind][1],'ob')

            for vt in range(len(self.Tree)):
                v = self.Tree[vt]
                if v == None:
                    continue
                cl = 'r'
                if self.isGtree[vt]:
                    cl = 'g'
                plt.plot(self.x[v][0],self.x[v][1],'o' + cl)
                if self.parent[vt]!=None:
                    plt.plot([self.x[v][0], self.x[self.Tree[self.parent[vt]]][0]],
                        [self.x[v][1], self.x[self.Tree[self.parent[vt]]][1]], '-' + cl)
            
            for vconnt in self.conn.keys():
                vconn = self.Tree[vconnt]
                vcon2 = self.Tree[self.conn[vconnt]]
                plt.plot([self.x[vconn][0], self.x[vcon2][0]],
                    [self.x[vconn][1], self.x[vcon2][1]], '-y')
                
        plt.pause(0.01)
        #plt.show()


if __name__ == '__main__':
    map2Drand = Map()
    bit = BiBITstar(map2Drand)
    # show map
    if show_animation:
        bit.map.drawMap()
        # plt.pause(10)
    start_time = time.time()
    bit.solve()
    print("time_use: ",time.time()-start_time)


        

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Algorithm skill tree Home page Overview 56939 people are learning the system