Project 2 – A* Search and Bi-Directional A* Search
# coding=utf-8 """ This file is your main submission that will be graded against. Only copy-paste code on the relevant classes included here. Do not add any classes or functions to this file that are not part of the classes that we want. """ import heapq import os import pickle import math class PriorityQueue(object): """ A queue structure where each element is served in order of priority. Elements in the queue are popped based on the priority with higher priority elements being served before lower priority elements. If two elements have the same priority, they will be served in the order they were added to the queue. Traditionally priority queues are implemented with heaps, but there are any number of implementation options. (Hint: take a look at the module heapq) Attributes: queue (list): Nodes added to the priority queue. """ def __init__(self): """Initialize a new Priority Queue.""" self.queue = [] self.count = 0 def pop(self): """ Pop top priority node from queue. Returns: The node with the highest priority. """ # TODO: finish this function! node_mod = heapq.heappop(self.queue) return (node_mod[0], node_mod[2]) raise NotImplementedError def remove(self, node): """ Remove a node from the queue. Hint: You might require this in ucs. However, you may choose not to use it or to define your own method. Args: node (tuple): The node to remove from the queue. """ raise NotImplementedError def __iter__(self): """Queue iterator.""" queue_mod = self.queue queue_rev = [(queue_mod[i][0], queue_mod[i][2]) for i in range(len(queue_mod))] #return iter(sorted(self.queue)) return iter(sorted(queue_rev)) def __str__(self): """Priority Queue to string.""" queue_mod = self.queue queue_rev = [(queue_mod[i][0], queue_mod[i][2]) for i in range(len(queue_mod))] #return 'PQ:%s' % self.queue return 'PQ:%s' % queue_rev def append(self, node): """ Append a node to the queue. Args: node: Comparable Object to be added to the priority queue. """ # TODO: finish this function! heapq.heappush(self.queue, node) print("DEBUG PQ: queue = ", self.queue) node_mod = (node[0], self.count, node[1]) heapq.heappush(self.queue, node_mod) self.count += 1 raise NotImplementedError def __contains__(self, key): """ Containment Check operator for 'in' Args: key: The key to check for in the queue. Returns: True if key is found in queue, False otherwise. """ return key in [n[-1] for n in self.queue] def __eq__(self, other): """ Compare this Priority Queue with another Priority Queue. Args: other (PriorityQueue): Priority Queue to compare against. Returns: True if the two priority queues are equivalent. """ return self.queue == other.queue def size(self): """ Get the current size of the queue. Returns: Integer of number of items in queue. """ return len(self.queue) def clear(self): """Reset queue to empty (no nodes).""" self.queue = [] def top(self): """ Get the top item in the queue. Returns: The first item stored in the queue. """ queue0 = self.queue[0] queue0_rev = (queue0[0], queue0[2]) #return self.queue[0] return queue0_rev def null_heuristic(graph, v, goal): """ Null heuristic used as a base line. Args: graph (ExplorableGraph): Undirected graph to search. v (str): Key for the node to calculate from. goal (str): Key for the end node to calculate to. Returns: 0 """ return 0 def euclidean_dist_heuristic(graph, v, goal): """ Warm-up exercise: Implement the euclidean distance heuristic. See README.md for exercise description. Args: graph (ExplorableGraph): Undirected graph to search. v (str): Key for the node to calculate from. goal (str): Key for the end node to calculate to. Returns: Euclidean distance between `v` node and `goal` node """ # TODO: finish this function! fst = graph.nodes[v] lst = graph.nodes[goal] #print("DEBUG: fst = ", fst, "\n") #print("DEBUG: goal = ", goal) fst_x = fst['pos'][0] fst_y = fst['pos'][1] lst_x = lst['pos'][0] lst_y = lst['pos'][1] dist = math.sqrt((fst_x-lst_x)**2 + (fst_y-lst_y)**2) return dist raise NotImplementedError def a_star(graph, start, goal, heuristic=euclidean_dist_heuristic): """ Warm-up exercise: Implement A* algorithm. See README.md for exercise description. Args: graph (ExplorableGraph): Undirected graph to search. start (str): Key for the start node. goal (str): Key for the end node. heuristic: Function to determine distance heuristic. Default: euclidean_dist_heuristic. Returns: The best path as a list from the start and goal nodes (including both). """ # TODO: finish this function! if start == goal: print("Total cost = ", 0) print("Path: ", [goal]) return [goal] paths = dict() frontier = PriorityQueue() frontier.append((heuristic(graph, start, goal), start)) paths[start] = [0, [start]] explored = set() while frontier.size() > 0: node = frontier.pop()[-1] if node == goal: print("Total cost = ", paths[goal][0]) print("Path: ", paths[goal][1]) return paths[goal][1] explored.add(node) for n in graph.neighbors(node): if not(n in explored): g = paths[node][0] + graph.get_edge_data(node, n)['weight'] frontier.append((g+heuristic(graph, n, goal), n)) entries = paths.keys() print("DEBUG entry: ", entry) if n in entries and g >= paths[n][0]: continue paths[n] = [g, paths[node][1]+[n]] raise NotImplementedError def bidirectional_a_star(graph, start, goal, heuristic=euclidean_dist_heuristic): """ Exercise 2: Bidirectional A*. See README.md for exercise description. Args: graph (ExplorableGraph): Undirected graph to search. start (str): Key for the start node. goal (str): Key for the end node. heuristic: Function to determine distance heuristic. Default: euclidean_dist_heuristic. Returns: The best path as a list from the start and goal nodes (including both). """ # TODO: finish this function! raise NotImplementedError def path_len(graph, path): l = 0 if len(path) <= 1: return 0 pred = path[0] for n in path[1:]: l += graph.get_edge_weight(pred, n) pred = n return l def load_data(graph, time_left): """ Feel free to implement this method. We'll call it only once at the beginning of the Race, and we'll pass the output to your custom_search function. graph: a networkx graph time_left: function you can call to keep track of your remaining time. usage: time_left() returns the time left in milliseconds. the max time will be 10 minutes. * To get a list of nodes, use graph.nodes() * To get node neighbors, use graph.neighbors(node) * To get edge weight, use graph.get_edge_weight(node1, node2) """ # nodes = graph.nodes() return None