deepxube.base.domain
1from abc import ABC, abstractmethod 2from typing import List, Tuple, Optional, Set, TypeVar, Generic, Dict, Any 3import numpy as np 4from clingo.solving import Model as ModelCl 5 6from deepxube.logic.logic_objects import Atom, Model 7from deepxube.utils import misc_utils 8from deepxube.nnet.nnet_utils import NNetPar, NNetCallable 9from deepxube.utils.timing_utils import Times 10from matplotlib.figure import Figure 11import random 12import time 13from numpy.typing import NDArray 14 15 16class State(ABC): 17 """ State object 18 19 """ 20 @abstractmethod 21 def __hash__(self) -> int: 22 """ For use in CLOSED dictionary for pathfinding 23 :return: hash value 24 """ 25 pass 26 27 @abstractmethod 28 def __eq__(self, other: object) -> bool: 29 """ for use in state reidentification during pathfinding 30 31 :param other: other state 32 :return: true if they are equal 33 """ 34 pass 35 36 37class Action(ABC): 38 """ Action object 39 40 """ 41 42 @abstractmethod 43 def __hash__(self) -> int: 44 """ For use in backup for Q* search 45 :return: hash value 46 """ 47 pass 48 49 @abstractmethod 50 def __eq__(self, other: object) -> bool: 51 """ for use in backup for Q* search 52 53 :param other: other state 54 :return: true if they are equal 55 """ 56 pass 57 58 59class Goal(ABC): 60 """ Goal object that represents a set of states considered goal states 61 62 """ 63 pass 64 65 66S = TypeVar('S', bound=State) 67A = TypeVar('A', bound=Action) 68G = TypeVar('G', bound=Goal) 69 70 71# TODO method for downloading data? 72class Domain(ABC, Generic[S, A, G]): 73 def __init__(self, *args: Any, **kwargs: Any) -> None: 74 self.nnet_pars: List[Tuple[str, str, NNetPar]] = [] 75 76 @abstractmethod 77 def get_start_goal_pairs(self, num_steps_l: List[int], 78 times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 79 """ Return start goal pairs with num_steps_l between start and goal 80 81 :param num_steps_l: Number of steps to take between start and goal 82 :param times: Times that can be used to profile code 83 :return: List of start states and list of goals 84 """ 85 pass 86 87 @abstractmethod 88 def get_state_action_rand(self, states: List[S]) -> List[A]: 89 """ Get a random action that is applicable to the current state 90 91 :param states: List of states 92 :return: List of random actions applicable to given states 93 """ 94 pass 95 96 @abstractmethod 97 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 98 """ Get the next state and transition cost given the current state and action 99 100 :param states: List of states 101 :param actions: List of actions to take 102 :return: Next states, transition costs 103 """ 104 pass 105 106 @abstractmethod 107 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 108 """ Returns true if the state is a member of the set of goal states represented by the goal 109 110 :param states: List of states 111 :param goals: List of goals 112 :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states 113 represented by the goal at index i 114 """ 115 pass 116 117 def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]: 118 """ Get random next state and transition cost given the current state 119 120 :param states: List of states 121 :return: Next states, transition costs 122 """ 123 actions_rand: List[A] = self.get_state_action_rand(states) 124 return self.next_state(states, actions_rand) 125 126 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 127 """ Perform a random walk on the given states for the given number of steps 128 129 :param states: List of states 130 :param num_steps_l: number of steps to take for each state 131 :return: The resulting state and the path cost for each random walk 132 """ 133 states_walk: List[S] = [state for state in states] 134 path_costs: List[float] = [0.0 for _ in states] 135 136 num_steps: NDArray[np.int_] = np.array(num_steps_l) 137 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 138 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 139 while np.any(steps_lt): 140 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 141 states_to_move = [states_walk[idx] for idx in idxs] 142 143 states_moved, tcs = self.next_state_rand(states_to_move) 144 145 idx: int 146 for move_idx, idx in enumerate(idxs): 147 states_walk[idx] = states_moved[move_idx] 148 path_costs[idx] += tcs[move_idx] 149 150 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 151 152 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 153 154 return states_walk, path_costs 155 156 def get_nnet_pars(self) -> List[Tuple[str, str, NNetPar]]: 157 return self.nnet_pars 158 159 def set_nnet_fns(self, nnet_fn_dict: Dict[str, NNetCallable]) -> None: 160 pass 161 162 163# Visualization Mixins 164class StateGoalVizable(Domain[S, A, G]): 165 """ Can visualize problem instances 166 167 """ 168 @abstractmethod 169 def visualize_state_goal(self, state: S, goal: G, fig: Figure) -> None: 170 pass 171 172 173class StringToAct(Domain[S, A, G]): 174 """ Can get an action from a string. Used when visualizing problem instances. 175 176 """ 177 @abstractmethod 178 def string_to_action(self, act_str: str) -> Optional[A]: 179 """ 180 :param act_str: A string representation of an action 181 :return: The action represented by the string, if it is a valid representation, None otherwise 182 """ 183 pass 184 185 186class ActsFixed(Domain[S, A, G]): 187 @abstractmethod 188 def get_action_rand(self, num: int) -> List[A]: 189 pass 190 191 def get_state_action_rand(self, states: List[S]) -> List[A]: 192 return self.get_action_rand(len(states)) 193 194 195class ActsRev(Domain[S, A, G], ABC): 196 """ Actions are reversible. 197 198 """ 199 @abstractmethod 200 def rev_action(self, actions: List[A]) -> List[A]: 201 """ Get the reverse of the given action 202 203 :param actions: List of actions 204 :return: Reverse of given action 205 """ 206 pass 207 208 @abstractmethod 209 def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 210 """ Transition along the directed edge in the reverse direction. 211 212 :param states: List of states 213 :param actions: List of actions to take 214 :return: Reverse states, transition costs which are weights of edges taken in reverse 215 """ 216 pass 217 218 219class ActsEnum(Domain[S, A, G]): 220 @abstractmethod 221 def get_state_actions(self, states: List[S]) -> List[List[A]]: 222 """ Get actions applicable to each states 223 224 :param states: List of states 225 :return: Applicable actions 226 """ 227 pass 228 229 def get_state_action_rand(self, states: List[S]) -> List[A]: 230 state_actions_l: List[List[A]] = self.get_state_actions(states) 231 return [random.choice(state_actions) for state_actions in state_actions_l] 232 233 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 234 """ Generate all children for the state, assumes there is at least one child state 235 :param states: List of states 236 :return: Children of each state, actions, transition costs for each state 237 """ 238 # TODO further validate 239 # initialize 240 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 241 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 242 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 243 state_actions: List[List[A]] = self.get_state_actions(states) 244 245 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 246 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 247 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 248 249 # for each move, get next states, transition costs, and if solved 250 while np.any(actions_lt): 251 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 252 states_idxs: List[S] = [states[idx] for idx in idxs] 253 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 254 255 # next state 256 states_next, tcs_move = self.next_state(states_idxs, actions_idxs) 257 258 # append 259 idx: int 260 for exp_idx, idx in enumerate(idxs): 261 states_exp_l[idx].append(states_next[exp_idx]) 262 actions_exp_l[idx].append(actions_idxs[exp_idx]) 263 tcs_l[idx].append(tcs_move[exp_idx]) 264 265 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 266 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 267 268 return states_exp_l, actions_exp_l, tcs_l 269 270 271class ActsEnumFixed(ActsEnum[S, A, G], ActsFixed[S, A, G]): 272 def get_action_rand(self, num: int) -> List[A]: 273 actions_fixed: List[A] = self.get_actions_fixed() 274 return [random.choice(actions_fixed) for _ in range(num)] 275 276 def get_state_actions(self, states: List[S]) -> List[List[A]]: 277 return [self.get_actions_fixed().copy() for _ in range(len(states))] 278 279 @abstractmethod 280 def get_actions_fixed(self) -> List[A]: 281 pass 282 283 def get_num_acts(self) -> int: 284 return len(self.get_actions_fixed()) 285 286 287# Goal mixins 288class GoalSampleable(Domain[S, A, G]): 289 """ Can sample goals from states""" 290 @abstractmethod 291 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 292 """ Given a state, return a goal that represents a set of goal states of which the given state is a member. 293 Does not have to always return the same goal. 294 295 :param states_start: List of start states 296 :param states_goal List of states from which goals will be sampled 297 :return: Goals 298 """ 299 pass 300 301 302class GoalStateSampleable(Domain[S, A, G]): 303 """ Can sample states from goals """ 304 @abstractmethod 305 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 306 """ Given a goal, sample states that are members of that goal. 307 308 :param goals: List of goals 309 :param num_states_l: List of integers representing how many states to sample for the corresponding goal 310 :return: List of list of states, where each element is a list of states sampled for the corresponding goal 311 """ 312 pass 313 314 315class GoalFixed(Domain[S, A, G]): 316 """ Goal is the same for all problem instances """ 317 @abstractmethod 318 def get_goal(self) -> G: 319 """ 320 :return: Fixed goal 321 """ 322 pass 323 324 325class StartGoalWalkable(GoalSampleable[S, A, G]): 326 """ Can sample start states, take actions to obtain another state, and sample a goal from that state""" 327 @abstractmethod 328 def get_start_states(self, num_states: int) -> List[S]: 329 """ A method for generating start states. Should try to make this generate states that are as diverse as 330 possible so that the trained heuristic function generalizes well. 331 332 :param num_states: Number of states to get 333 :return: Generated states 334 """ 335 pass 336 337 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 338 # Initialize 339 if times is None: 340 times = Times() 341 342 # Start states 343 start_time = time.time() 344 states_start: List[S] = self.get_start_states(len(num_steps_l)) 345 times.record_time("get_start_states", time.time() - start_time) 346 347 # random walk 348 start_time = time.time() 349 states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0] 350 times.record_time("random_walk", time.time() - start_time) 351 352 # state to goal 353 start_time = time.time() 354 goals: List[G] = self.sample_goal(states_start, states_goal) 355 times.record_time("sample_goal", time.time() - start_time) 356 357 return states_start, goals 358 359 360class GoalStateSampleableFixed(GoalStateSampleable[S, A, G], GoalFixed[S, A, G]): 361 """ Can sample states from goal, which is the same for all problem instances """ 362 363 @abstractmethod 364 def sample_goal_states_fixed(self, num_states: int) -> List[S]: 365 pass 366 367 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 368 return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l] 369 370 371# reverse walks 372class FixedGoalRevWalk(GoalStateSampleableFixed[S, A, G]): 373 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 374 # Initialize 375 if times is None: 376 times = Times() 377 378 # Start states 379 start_time = time.time() 380 states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l)) 381 times.record_time("get_start_states", time.time() - start_time) 382 383 # random walk 384 start_time = time.time() 385 states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l) 386 times.record_time("random_walk", time.time() - start_time) 387 388 # state to goal 389 start_time = time.time() 390 goals: List[G] = [self.get_goal()] * len(states_start) 391 times.record_time("sample_goal", time.time() - start_time) 392 393 return states_start, goals 394 395 @abstractmethod 396 def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]: 397 pass 398 399 400class FixedGoalRevWalkActsRev(FixedGoalRevWalk[S, A, G], ActsRev[S, A, G], ABC): 401 def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]: 402 return self.random_walk(states, num_steps_l)[0] 403 404 405# numpy convenience mixins 406class NextStateNP(Domain[S, A, G]): 407 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 408 states_np: List[NDArray] = self._states_to_np(states) 409 states_next_np, tcs = self._next_state_np(states_np, actions) 410 states_next: List[S] = self._np_to_states(states_next_np) 411 412 return states_next, tcs 413 414 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 415 states_np = self._states_to_np(states) 416 path_costs: List[float] = [0.0 for _ in states] 417 418 num_steps: NDArray[np.int_] = np.array(num_steps_l) 419 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 420 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 421 while np.any(steps_lt): 422 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 423 states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 424 actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove) 425 426 states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand) 427 428 for l_idx in range(len(states_np)): 429 states_np[l_idx][idxs] = states_moved[l_idx] 430 idx: int 431 for act_idx, idx in enumerate(idxs): 432 path_costs[idx] += tcs[act_idx] 433 434 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 435 436 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 437 438 return self._np_to_states(states_np), path_costs 439 440 @abstractmethod 441 def _states_to_np(self, states: List[S]) -> List[NDArray]: 442 pass 443 444 @abstractmethod 445 def _np_to_states(self, states_np_l: List[NDArray]) -> List[S]: 446 pass 447 448 @abstractmethod 449 def _get_state_np_actions(self, states_np_l: List[NDArray]) -> List[List[A]]: 450 pass 451 452 def _get_state_np_action_rand(self, states_np: List[NDArray]) -> List[A]: 453 state_actions_l: List[List[A]] = self._get_state_np_actions(states_np) 454 return [random.choice(state_actions) for state_actions in state_actions_l] 455 456 @abstractmethod 457 def _next_state_np(self, states_np: List[NDArray], actions: List[A]) -> Tuple[List[NDArray], List[float]]: 458 """ Get the next state and transition cost given the current numpy representations of the state and action 459 460 461 @param states_np: numpy representation of states. Each row in each element of states_np list represents 462 information for a different state. There can be one or more multiple elements in the list for each state. 463 This object should not be mutated. 464 @param actions: actions 465 @return: Numpy representation of next states, transition costs 466 """ 467 pass 468 469 470class NextStateNPActsEnum(NextStateNP[S, A, G], ActsEnum[S, A, G], ABC): 471 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 472 # initialize 473 states_np: List[NDArray] = self._states_to_np(states) 474 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 475 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 476 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 477 state_actions: List[List[A]] = self.get_state_actions(states) 478 479 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 480 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 481 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 482 483 # for each move, get next states, transition costs, and if solved 484 while np.any(actions_lt): 485 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 486 states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 487 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 488 489 # next state 490 states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs) 491 states_next: List[S] = self._np_to_states(states_next_np) 492 493 # append 494 idx: int 495 for exp_idx, idx in enumerate(idxs): 496 states_exp_l[idx].append(states_next[exp_idx]) 497 actions_exp_l[idx].append(actions_idxs[exp_idx]) 498 tcs_l[idx].append(tcs_move[exp_idx]) 499 500 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 501 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 502 503 return states_exp_l, actions_exp_l, tcs_l 504 505 506class NextStateNPActsEnumFixed(NextStateNPActsEnum[S, A, G], ActsEnumFixed[S, A, G], ABC): 507 def _get_state_np_actions(self, states_np: List[NDArray]) -> List[List[A]]: 508 state_actions: List[A] = self.get_actions_fixed() 509 return [state_actions.copy() for _ in range(states_np[0].shape[0])] 510 511 512class SupportsPDDL(Domain[S, A, G], ABC): 513 @abstractmethod 514 def get_pddl_domain(self) -> List[str]: 515 pass 516 517 @abstractmethod 518 def state_goal_to_pddl_inst(self, state: S, goal: G) -> List[str]: 519 pass 520 521 @abstractmethod 522 def pddl_action_to_action(self, pddl_action: str) -> A: 523 pass 524 525 526class GoalGrndAtoms(GoalSampleable[S, A, G]): 527 @abstractmethod 528 def state_to_model(self, states: List[S]) -> List[Model]: 529 pass 530 531 @abstractmethod 532 def model_to_state(self, models: List[Model]) -> List[S]: 533 """ Assumes model is a fully specified state 534 535 :param models: 536 :return: 537 """ 538 pass 539 540 @abstractmethod 541 def goal_to_model(self, goals: List[G]) -> List[Model]: 542 pass 543 544 @abstractmethod 545 def model_to_goal(self, models: List[Model]) -> List[G]: 546 pass 547 548 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 549 """ Returns whether or not state is solved 550 551 :param states: List of states 552 :param goals: List of goals 553 :return: Boolean numpy array where the element at index i corresponds to whether or not the 554 state at index i is solved 555 """ 556 models_g: List[Model] = self.goal_to_model(goals) 557 is_solved_l: List[bool] = [] 558 models_s: List[Model] = self.state_to_model(states) 559 for model_state, model_goal in zip(models_s, models_g): 560 is_solved_l.append(model_goal.issubset(model_state)) 561 562 return is_solved_l 563 564 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 565 models_g: List[Model] = [] 566 567 models_s: List[Model] = self.state_to_model(states_goal) 568 keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal)) 569 for model_s, keep_prob in zip(models_s, keep_probs): 570 rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob) 571 models_g.append(frozenset(rand_subset)) 572 573 return self.model_to_goal(models_g) 574 575 @abstractmethod 576 def get_bk(self) -> List[str]: 577 """ get background, each element in list is a line 578 579 :return: 580 """ 581 pass 582 583 @abstractmethod 584 def get_ground_atoms(self) -> List[Atom]: 585 """ Get all possible ground atoms that can be used to make a state 586 587 :return: 588 """ 589 pass 590 591 @abstractmethod 592 def on_model(self, m: ModelCl) -> Model: 593 """ Process results from clingo 594 595 :param m: 596 :return: 597 """ 598 pass 599 600 @abstractmethod 601 def start_state_fixed(self, states: List[S]) -> List[Model]: 602 """ Given the start state, what must also be true for the goal state (i.e. immovable walls) 603 604 :param states: 605 :return: 606 """ 607 pass
17class State(ABC): 18 """ State object 19 20 """ 21 @abstractmethod 22 def __hash__(self) -> int: 23 """ For use in CLOSED dictionary for pathfinding 24 :return: hash value 25 """ 26 pass 27 28 @abstractmethod 29 def __eq__(self, other: object) -> bool: 30 """ for use in state reidentification during pathfinding 31 32 :param other: other state 33 :return: true if they are equal 34 """ 35 pass
State object
38class Action(ABC): 39 """ Action object 40 41 """ 42 43 @abstractmethod 44 def __hash__(self) -> int: 45 """ For use in backup for Q* search 46 :return: hash value 47 """ 48 pass 49 50 @abstractmethod 51 def __eq__(self, other: object) -> bool: 52 """ for use in backup for Q* search 53 54 :param other: other state 55 :return: true if they are equal 56 """ 57 pass
Action object
60class Goal(ABC): 61 """ Goal object that represents a set of states considered goal states 62 63 """ 64 pass
Goal object that represents a set of states considered goal states
73class Domain(ABC, Generic[S, A, G]): 74 def __init__(self, *args: Any, **kwargs: Any) -> None: 75 self.nnet_pars: List[Tuple[str, str, NNetPar]] = [] 76 77 @abstractmethod 78 def get_start_goal_pairs(self, num_steps_l: List[int], 79 times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 80 """ Return start goal pairs with num_steps_l between start and goal 81 82 :param num_steps_l: Number of steps to take between start and goal 83 :param times: Times that can be used to profile code 84 :return: List of start states and list of goals 85 """ 86 pass 87 88 @abstractmethod 89 def get_state_action_rand(self, states: List[S]) -> List[A]: 90 """ Get a random action that is applicable to the current state 91 92 :param states: List of states 93 :return: List of random actions applicable to given states 94 """ 95 pass 96 97 @abstractmethod 98 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 99 """ Get the next state and transition cost given the current state and action 100 101 :param states: List of states 102 :param actions: List of actions to take 103 :return: Next states, transition costs 104 """ 105 pass 106 107 @abstractmethod 108 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 109 """ Returns true if the state is a member of the set of goal states represented by the goal 110 111 :param states: List of states 112 :param goals: List of goals 113 :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states 114 represented by the goal at index i 115 """ 116 pass 117 118 def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]: 119 """ Get random next state and transition cost given the current state 120 121 :param states: List of states 122 :return: Next states, transition costs 123 """ 124 actions_rand: List[A] = self.get_state_action_rand(states) 125 return self.next_state(states, actions_rand) 126 127 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 128 """ Perform a random walk on the given states for the given number of steps 129 130 :param states: List of states 131 :param num_steps_l: number of steps to take for each state 132 :return: The resulting state and the path cost for each random walk 133 """ 134 states_walk: List[S] = [state for state in states] 135 path_costs: List[float] = [0.0 for _ in states] 136 137 num_steps: NDArray[np.int_] = np.array(num_steps_l) 138 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 139 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 140 while np.any(steps_lt): 141 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 142 states_to_move = [states_walk[idx] for idx in idxs] 143 144 states_moved, tcs = self.next_state_rand(states_to_move) 145 146 idx: int 147 for move_idx, idx in enumerate(idxs): 148 states_walk[idx] = states_moved[move_idx] 149 path_costs[idx] += tcs[move_idx] 150 151 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 152 153 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 154 155 return states_walk, path_costs 156 157 def get_nnet_pars(self) -> List[Tuple[str, str, NNetPar]]: 158 return self.nnet_pars 159 160 def set_nnet_fns(self, nnet_fn_dict: Dict[str, NNetCallable]) -> None: 161 pass
Helper class that provides a standard way to create an ABC using inheritance.
77 @abstractmethod 78 def get_start_goal_pairs(self, num_steps_l: List[int], 79 times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 80 """ Return start goal pairs with num_steps_l between start and goal 81 82 :param num_steps_l: Number of steps to take between start and goal 83 :param times: Times that can be used to profile code 84 :return: List of start states and list of goals 85 """ 86 pass
Return start goal pairs with num_steps_l between start and goal
Parameters
- num_steps_l: Number of steps to take between start and goal
- times: Times that can be used to profile code
Returns
List of start states and list of goals
88 @abstractmethod 89 def get_state_action_rand(self, states: List[S]) -> List[A]: 90 """ Get a random action that is applicable to the current state 91 92 :param states: List of states 93 :return: List of random actions applicable to given states 94 """ 95 pass
Get a random action that is applicable to the current state
Parameters
- states: List of states
Returns
List of random actions applicable to given states
97 @abstractmethod 98 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 99 """ Get the next state and transition cost given the current state and action 100 101 :param states: List of states 102 :param actions: List of actions to take 103 :return: Next states, transition costs 104 """ 105 pass
Get the next state and transition cost given the current state and action
Parameters
- states: List of states
- actions: List of actions to take
Returns
Next states, transition costs
107 @abstractmethod 108 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 109 """ Returns true if the state is a member of the set of goal states represented by the goal 110 111 :param states: List of states 112 :param goals: List of goals 113 :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states 114 represented by the goal at index i 115 """ 116 pass
Returns true if the state is a member of the set of goal states represented by the goal
Parameters
- states: List of states
- goals: List of goals
Returns
List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states represented by the goal at index i
118 def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]: 119 """ Get random next state and transition cost given the current state 120 121 :param states: List of states 122 :return: Next states, transition costs 123 """ 124 actions_rand: List[A] = self.get_state_action_rand(states) 125 return self.next_state(states, actions_rand)
Get random next state and transition cost given the current state
Parameters
- states: List of states
Returns
Next states, transition costs
127 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 128 """ Perform a random walk on the given states for the given number of steps 129 130 :param states: List of states 131 :param num_steps_l: number of steps to take for each state 132 :return: The resulting state and the path cost for each random walk 133 """ 134 states_walk: List[S] = [state for state in states] 135 path_costs: List[float] = [0.0 for _ in states] 136 137 num_steps: NDArray[np.int_] = np.array(num_steps_l) 138 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 139 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 140 while np.any(steps_lt): 141 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 142 states_to_move = [states_walk[idx] for idx in idxs] 143 144 states_moved, tcs = self.next_state_rand(states_to_move) 145 146 idx: int 147 for move_idx, idx in enumerate(idxs): 148 states_walk[idx] = states_moved[move_idx] 149 path_costs[idx] += tcs[move_idx] 150 151 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 152 153 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 154 155 return states_walk, path_costs
Perform a random walk on the given states for the given number of steps
Parameters
- states: List of states
- num_steps_l: number of steps to take for each state
Returns
The resulting state and the path cost for each random walk
165class StateGoalVizable(Domain[S, A, G]): 166 """ Can visualize problem instances 167 168 """ 169 @abstractmethod 170 def visualize_state_goal(self, state: S, goal: G, fig: Figure) -> None: 171 pass
Can visualize problem instances
174class StringToAct(Domain[S, A, G]): 175 """ Can get an action from a string. Used when visualizing problem instances. 176 177 """ 178 @abstractmethod 179 def string_to_action(self, act_str: str) -> Optional[A]: 180 """ 181 :param act_str: A string representation of an action 182 :return: The action represented by the string, if it is a valid representation, None otherwise 183 """ 184 pass
Can get an action from a string. Used when visualizing problem instances.
178 @abstractmethod 179 def string_to_action(self, act_str: str) -> Optional[A]: 180 """ 181 :param act_str: A string representation of an action 182 :return: The action represented by the string, if it is a valid representation, None otherwise 183 """ 184 pass
Parameters
- act_str: A string representation of an action
Returns
The action represented by the string, if it is a valid representation, None otherwise
187class ActsFixed(Domain[S, A, G]): 188 @abstractmethod 189 def get_action_rand(self, num: int) -> List[A]: 190 pass 191 192 def get_state_action_rand(self, states: List[S]) -> List[A]: 193 return self.get_action_rand(len(states))
Helper class that provides a standard way to create an ABC using inheritance.
192 def get_state_action_rand(self, states: List[S]) -> List[A]: 193 return self.get_action_rand(len(states))
Get a random action that is applicable to the current state
Parameters
- states: List of states
Returns
List of random actions applicable to given states
196class ActsRev(Domain[S, A, G], ABC): 197 """ Actions are reversible. 198 199 """ 200 @abstractmethod 201 def rev_action(self, actions: List[A]) -> List[A]: 202 """ Get the reverse of the given action 203 204 :param actions: List of actions 205 :return: Reverse of given action 206 """ 207 pass 208 209 @abstractmethod 210 def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 211 """ Transition along the directed edge in the reverse direction. 212 213 :param states: List of states 214 :param actions: List of actions to take 215 :return: Reverse states, transition costs which are weights of edges taken in reverse 216 """ 217 pass
Actions are reversible.
200 @abstractmethod 201 def rev_action(self, actions: List[A]) -> List[A]: 202 """ Get the reverse of the given action 203 204 :param actions: List of actions 205 :return: Reverse of given action 206 """ 207 pass
Get the reverse of the given action
Parameters
- actions: List of actions
Returns
Reverse of given action
209 @abstractmethod 210 def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 211 """ Transition along the directed edge in the reverse direction. 212 213 :param states: List of states 214 :param actions: List of actions to take 215 :return: Reverse states, transition costs which are weights of edges taken in reverse 216 """ 217 pass
Transition along the directed edge in the reverse direction.
Parameters
- states: List of states
- actions: List of actions to take
Returns
Reverse states, transition costs which are weights of edges taken in reverse
220class ActsEnum(Domain[S, A, G]): 221 @abstractmethod 222 def get_state_actions(self, states: List[S]) -> List[List[A]]: 223 """ Get actions applicable to each states 224 225 :param states: List of states 226 :return: Applicable actions 227 """ 228 pass 229 230 def get_state_action_rand(self, states: List[S]) -> List[A]: 231 state_actions_l: List[List[A]] = self.get_state_actions(states) 232 return [random.choice(state_actions) for state_actions in state_actions_l] 233 234 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 235 """ Generate all children for the state, assumes there is at least one child state 236 :param states: List of states 237 :return: Children of each state, actions, transition costs for each state 238 """ 239 # TODO further validate 240 # initialize 241 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 242 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 243 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 244 state_actions: List[List[A]] = self.get_state_actions(states) 245 246 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 247 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 248 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 249 250 # for each move, get next states, transition costs, and if solved 251 while np.any(actions_lt): 252 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 253 states_idxs: List[S] = [states[idx] for idx in idxs] 254 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 255 256 # next state 257 states_next, tcs_move = self.next_state(states_idxs, actions_idxs) 258 259 # append 260 idx: int 261 for exp_idx, idx in enumerate(idxs): 262 states_exp_l[idx].append(states_next[exp_idx]) 263 actions_exp_l[idx].append(actions_idxs[exp_idx]) 264 tcs_l[idx].append(tcs_move[exp_idx]) 265 266 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 267 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 268 269 return states_exp_l, actions_exp_l, tcs_l
Helper class that provides a standard way to create an ABC using inheritance.
221 @abstractmethod 222 def get_state_actions(self, states: List[S]) -> List[List[A]]: 223 """ Get actions applicable to each states 224 225 :param states: List of states 226 :return: Applicable actions 227 """ 228 pass
Get actions applicable to each states
Parameters
- states: List of states
Returns
Applicable actions
230 def get_state_action_rand(self, states: List[S]) -> List[A]: 231 state_actions_l: List[List[A]] = self.get_state_actions(states) 232 return [random.choice(state_actions) for state_actions in state_actions_l]
Get a random action that is applicable to the current state
Parameters
- states: List of states
Returns
List of random actions applicable to given states
234 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 235 """ Generate all children for the state, assumes there is at least one child state 236 :param states: List of states 237 :return: Children of each state, actions, transition costs for each state 238 """ 239 # TODO further validate 240 # initialize 241 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 242 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 243 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 244 state_actions: List[List[A]] = self.get_state_actions(states) 245 246 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 247 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 248 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 249 250 # for each move, get next states, transition costs, and if solved 251 while np.any(actions_lt): 252 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 253 states_idxs: List[S] = [states[idx] for idx in idxs] 254 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 255 256 # next state 257 states_next, tcs_move = self.next_state(states_idxs, actions_idxs) 258 259 # append 260 idx: int 261 for exp_idx, idx in enumerate(idxs): 262 states_exp_l[idx].append(states_next[exp_idx]) 263 actions_exp_l[idx].append(actions_idxs[exp_idx]) 264 tcs_l[idx].append(tcs_move[exp_idx]) 265 266 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 267 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 268 269 return states_exp_l, actions_exp_l, tcs_l
Generate all children for the state, assumes there is at least one child state
Parameters
- states: List of states
Returns
Children of each state, actions, transition costs for each state
272class ActsEnumFixed(ActsEnum[S, A, G], ActsFixed[S, A, G]): 273 def get_action_rand(self, num: int) -> List[A]: 274 actions_fixed: List[A] = self.get_actions_fixed() 275 return [random.choice(actions_fixed) for _ in range(num)] 276 277 def get_state_actions(self, states: List[S]) -> List[List[A]]: 278 return [self.get_actions_fixed().copy() for _ in range(len(states))] 279 280 @abstractmethod 281 def get_actions_fixed(self) -> List[A]: 282 pass 283 284 def get_num_acts(self) -> int: 285 return len(self.get_actions_fixed())
Helper class that provides a standard way to create an ABC using inheritance.
277 def get_state_actions(self, states: List[S]) -> List[List[A]]: 278 return [self.get_actions_fixed().copy() for _ in range(len(states))]
Get actions applicable to each states
Parameters
- states: List of states
Returns
Applicable actions
289class GoalSampleable(Domain[S, A, G]): 290 """ Can sample goals from states""" 291 @abstractmethod 292 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 293 """ Given a state, return a goal that represents a set of goal states of which the given state is a member. 294 Does not have to always return the same goal. 295 296 :param states_start: List of start states 297 :param states_goal List of states from which goals will be sampled 298 :return: Goals 299 """ 300 pass
Can sample goals from states
291 @abstractmethod 292 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 293 """ Given a state, return a goal that represents a set of goal states of which the given state is a member. 294 Does not have to always return the same goal. 295 296 :param states_start: List of start states 297 :param states_goal List of states from which goals will be sampled 298 :return: Goals 299 """ 300 pass
Given a state, return a goal that represents a set of goal states of which the given state is a member. Does not have to always return the same goal.
Parameters
- states_start: List of start states :param states_goal List of states from which goals will be sampled
Returns
Goals
303class GoalStateSampleable(Domain[S, A, G]): 304 """ Can sample states from goals """ 305 @abstractmethod 306 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 307 """ Given a goal, sample states that are members of that goal. 308 309 :param goals: List of goals 310 :param num_states_l: List of integers representing how many states to sample for the corresponding goal 311 :return: List of list of states, where each element is a list of states sampled for the corresponding goal 312 """ 313 pass
Can sample states from goals
305 @abstractmethod 306 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 307 """ Given a goal, sample states that are members of that goal. 308 309 :param goals: List of goals 310 :param num_states_l: List of integers representing how many states to sample for the corresponding goal 311 :return: List of list of states, where each element is a list of states sampled for the corresponding goal 312 """ 313 pass
Given a goal, sample states that are members of that goal.
Parameters
- goals: List of goals
- num_states_l: List of integers representing how many states to sample for the corresponding goal
Returns
List of list of states, where each element is a list of states sampled for the corresponding goal
316class GoalFixed(Domain[S, A, G]): 317 """ Goal is the same for all problem instances """ 318 @abstractmethod 319 def get_goal(self) -> G: 320 """ 321 :return: Fixed goal 322 """ 323 pass
Goal is the same for all problem instances
326class StartGoalWalkable(GoalSampleable[S, A, G]): 327 """ Can sample start states, take actions to obtain another state, and sample a goal from that state""" 328 @abstractmethod 329 def get_start_states(self, num_states: int) -> List[S]: 330 """ A method for generating start states. Should try to make this generate states that are as diverse as 331 possible so that the trained heuristic function generalizes well. 332 333 :param num_states: Number of states to get 334 :return: Generated states 335 """ 336 pass 337 338 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 339 # Initialize 340 if times is None: 341 times = Times() 342 343 # Start states 344 start_time = time.time() 345 states_start: List[S] = self.get_start_states(len(num_steps_l)) 346 times.record_time("get_start_states", time.time() - start_time) 347 348 # random walk 349 start_time = time.time() 350 states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0] 351 times.record_time("random_walk", time.time() - start_time) 352 353 # state to goal 354 start_time = time.time() 355 goals: List[G] = self.sample_goal(states_start, states_goal) 356 times.record_time("sample_goal", time.time() - start_time) 357 358 return states_start, goals
Can sample start states, take actions to obtain another state, and sample a goal from that state
328 @abstractmethod 329 def get_start_states(self, num_states: int) -> List[S]: 330 """ A method for generating start states. Should try to make this generate states that are as diverse as 331 possible so that the trained heuristic function generalizes well. 332 333 :param num_states: Number of states to get 334 :return: Generated states 335 """ 336 pass
A method for generating start states. Should try to make this generate states that are as diverse as possible so that the trained heuristic function generalizes well.
Parameters
- num_states: Number of states to get
Returns
Generated states
338 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 339 # Initialize 340 if times is None: 341 times = Times() 342 343 # Start states 344 start_time = time.time() 345 states_start: List[S] = self.get_start_states(len(num_steps_l)) 346 times.record_time("get_start_states", time.time() - start_time) 347 348 # random walk 349 start_time = time.time() 350 states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0] 351 times.record_time("random_walk", time.time() - start_time) 352 353 # state to goal 354 start_time = time.time() 355 goals: List[G] = self.sample_goal(states_start, states_goal) 356 times.record_time("sample_goal", time.time() - start_time) 357 358 return states_start, goals
Return start goal pairs with num_steps_l between start and goal
Parameters
- num_steps_l: Number of steps to take between start and goal
- times: Times that can be used to profile code
Returns
List of start states and list of goals
361class GoalStateSampleableFixed(GoalStateSampleable[S, A, G], GoalFixed[S, A, G]): 362 """ Can sample states from goal, which is the same for all problem instances """ 363 364 @abstractmethod 365 def sample_goal_states_fixed(self, num_states: int) -> List[S]: 366 pass 367 368 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 369 return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l]
Can sample states from goal, which is the same for all problem instances
368 def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]: 369 return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l]
Given a goal, sample states that are members of that goal.
Parameters
- goals: List of goals
- num_states_l: List of integers representing how many states to sample for the corresponding goal
Returns
List of list of states, where each element is a list of states sampled for the corresponding goal
373class FixedGoalRevWalk(GoalStateSampleableFixed[S, A, G]): 374 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 375 # Initialize 376 if times is None: 377 times = Times() 378 379 # Start states 380 start_time = time.time() 381 states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l)) 382 times.record_time("get_start_states", time.time() - start_time) 383 384 # random walk 385 start_time = time.time() 386 states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l) 387 times.record_time("random_walk", time.time() - start_time) 388 389 # state to goal 390 start_time = time.time() 391 goals: List[G] = [self.get_goal()] * len(states_start) 392 times.record_time("sample_goal", time.time() - start_time) 393 394 return states_start, goals 395 396 @abstractmethod 397 def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]: 398 pass
Can sample states from goal, which is the same for all problem instances
374 def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]: 375 # Initialize 376 if times is None: 377 times = Times() 378 379 # Start states 380 start_time = time.time() 381 states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l)) 382 times.record_time("get_start_states", time.time() - start_time) 383 384 # random walk 385 start_time = time.time() 386 states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l) 387 times.record_time("random_walk", time.time() - start_time) 388 389 # state to goal 390 start_time = time.time() 391 goals: List[G] = [self.get_goal()] * len(states_start) 392 times.record_time("sample_goal", time.time() - start_time) 393 394 return states_start, goals
Return start goal pairs with num_steps_l between start and goal
Parameters
- num_steps_l: Number of steps to take between start and goal
- times: Times that can be used to profile code
Returns
List of start states and list of goals
401class FixedGoalRevWalkActsRev(FixedGoalRevWalk[S, A, G], ActsRev[S, A, G], ABC): 402 def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]: 403 return self.random_walk(states, num_steps_l)[0]
Can sample states from goal, which is the same for all problem instances
407class NextStateNP(Domain[S, A, G]): 408 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 409 states_np: List[NDArray] = self._states_to_np(states) 410 states_next_np, tcs = self._next_state_np(states_np, actions) 411 states_next: List[S] = self._np_to_states(states_next_np) 412 413 return states_next, tcs 414 415 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 416 states_np = self._states_to_np(states) 417 path_costs: List[float] = [0.0 for _ in states] 418 419 num_steps: NDArray[np.int_] = np.array(num_steps_l) 420 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 421 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 422 while np.any(steps_lt): 423 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 424 states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 425 actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove) 426 427 states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand) 428 429 for l_idx in range(len(states_np)): 430 states_np[l_idx][idxs] = states_moved[l_idx] 431 idx: int 432 for act_idx, idx in enumerate(idxs): 433 path_costs[idx] += tcs[act_idx] 434 435 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 436 437 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 438 439 return self._np_to_states(states_np), path_costs 440 441 @abstractmethod 442 def _states_to_np(self, states: List[S]) -> List[NDArray]: 443 pass 444 445 @abstractmethod 446 def _np_to_states(self, states_np_l: List[NDArray]) -> List[S]: 447 pass 448 449 @abstractmethod 450 def _get_state_np_actions(self, states_np_l: List[NDArray]) -> List[List[A]]: 451 pass 452 453 def _get_state_np_action_rand(self, states_np: List[NDArray]) -> List[A]: 454 state_actions_l: List[List[A]] = self._get_state_np_actions(states_np) 455 return [random.choice(state_actions) for state_actions in state_actions_l] 456 457 @abstractmethod 458 def _next_state_np(self, states_np: List[NDArray], actions: List[A]) -> Tuple[List[NDArray], List[float]]: 459 """ Get the next state and transition cost given the current numpy representations of the state and action 460 461 462 @param states_np: numpy representation of states. Each row in each element of states_np list represents 463 information for a different state. There can be one or more multiple elements in the list for each state. 464 This object should not be mutated. 465 @param actions: actions 466 @return: Numpy representation of next states, transition costs 467 """ 468 pass
Helper class that provides a standard way to create an ABC using inheritance.
408 def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]: 409 states_np: List[NDArray] = self._states_to_np(states) 410 states_next_np, tcs = self._next_state_np(states_np, actions) 411 states_next: List[S] = self._np_to_states(states_next_np) 412 413 return states_next, tcs
Get the next state and transition cost given the current state and action
Parameters
- states: List of states
- actions: List of actions to take
Returns
Next states, transition costs
415 def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]: 416 states_np = self._states_to_np(states) 417 path_costs: List[float] = [0.0 for _ in states] 418 419 num_steps: NDArray[np.int_] = np.array(num_steps_l) 420 num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int) 421 steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps 422 while np.any(steps_lt): 423 idxs: NDArray[np.int_] = np.where(steps_lt)[0] 424 states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 425 actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove) 426 427 states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand) 428 429 for l_idx in range(len(states_np)): 430 states_np[l_idx][idxs] = states_moved[l_idx] 431 idx: int 432 for act_idx, idx in enumerate(idxs): 433 path_costs[idx] += tcs[act_idx] 434 435 num_steps_curr[idxs] = num_steps_curr[idxs] + 1 436 437 steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs] 438 439 return self._np_to_states(states_np), path_costs
Perform a random walk on the given states for the given number of steps
Parameters
- states: List of states
- num_steps_l: number of steps to take for each state
Returns
The resulting state and the path cost for each random walk
471class NextStateNPActsEnum(NextStateNP[S, A, G], ActsEnum[S, A, G], ABC): 472 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 473 # initialize 474 states_np: List[NDArray] = self._states_to_np(states) 475 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 476 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 477 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 478 state_actions: List[List[A]] = self.get_state_actions(states) 479 480 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 481 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 482 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 483 484 # for each move, get next states, transition costs, and if solved 485 while np.any(actions_lt): 486 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 487 states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 488 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 489 490 # next state 491 states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs) 492 states_next: List[S] = self._np_to_states(states_next_np) 493 494 # append 495 idx: int 496 for exp_idx, idx in enumerate(idxs): 497 states_exp_l[idx].append(states_next[exp_idx]) 498 actions_exp_l[idx].append(actions_idxs[exp_idx]) 499 tcs_l[idx].append(tcs_move[exp_idx]) 500 501 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 502 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 503 504 return states_exp_l, actions_exp_l, tcs_l
Helper class that provides a standard way to create an ABC using inheritance.
472 def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]: 473 # initialize 474 states_np: List[NDArray] = self._states_to_np(states) 475 states_exp_l: List[List[S]] = [[] for _ in range(len(states))] 476 actions_exp_l: List[List[A]] = [[] for _ in range(len(states))] 477 tcs_l: List[List[float]] = [[] for _ in range(len(states))] 478 state_actions: List[List[A]] = self.get_state_actions(states) 479 480 num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions]) 481 num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int) 482 actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot 483 484 # for each move, get next states, transition costs, and if solved 485 while np.any(actions_lt): 486 idxs: NDArray[np.int_] = np.where(actions_lt)[0] 487 states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np] 488 actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs] 489 490 # next state 491 states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs) 492 states_next: List[S] = self._np_to_states(states_next_np) 493 494 # append 495 idx: int 496 for exp_idx, idx in enumerate(idxs): 497 states_exp_l[idx].append(states_next[exp_idx]) 498 actions_exp_l[idx].append(actions_idxs[exp_idx]) 499 tcs_l[idx].append(tcs_move[exp_idx]) 500 501 num_actions_taken[idxs] = num_actions_taken[idxs] + 1 502 actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs] 503 504 return states_exp_l, actions_exp_l, tcs_l
Generate all children for the state, assumes there is at least one child state
Parameters
- states: List of states
Returns
Children of each state, actions, transition costs for each state
507class NextStateNPActsEnumFixed(NextStateNPActsEnum[S, A, G], ActsEnumFixed[S, A, G], ABC): 508 def _get_state_np_actions(self, states_np: List[NDArray]) -> List[List[A]]: 509 state_actions: List[A] = self.get_actions_fixed() 510 return [state_actions.copy() for _ in range(states_np[0].shape[0])]
Helper class that provides a standard way to create an ABC using inheritance.
513class SupportsPDDL(Domain[S, A, G], ABC): 514 @abstractmethod 515 def get_pddl_domain(self) -> List[str]: 516 pass 517 518 @abstractmethod 519 def state_goal_to_pddl_inst(self, state: S, goal: G) -> List[str]: 520 pass 521 522 @abstractmethod 523 def pddl_action_to_action(self, pddl_action: str) -> A: 524 pass
Helper class that provides a standard way to create an ABC using inheritance.
527class GoalGrndAtoms(GoalSampleable[S, A, G]): 528 @abstractmethod 529 def state_to_model(self, states: List[S]) -> List[Model]: 530 pass 531 532 @abstractmethod 533 def model_to_state(self, models: List[Model]) -> List[S]: 534 """ Assumes model is a fully specified state 535 536 :param models: 537 :return: 538 """ 539 pass 540 541 @abstractmethod 542 def goal_to_model(self, goals: List[G]) -> List[Model]: 543 pass 544 545 @abstractmethod 546 def model_to_goal(self, models: List[Model]) -> List[G]: 547 pass 548 549 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 550 """ Returns whether or not state is solved 551 552 :param states: List of states 553 :param goals: List of goals 554 :return: Boolean numpy array where the element at index i corresponds to whether or not the 555 state at index i is solved 556 """ 557 models_g: List[Model] = self.goal_to_model(goals) 558 is_solved_l: List[bool] = [] 559 models_s: List[Model] = self.state_to_model(states) 560 for model_state, model_goal in zip(models_s, models_g): 561 is_solved_l.append(model_goal.issubset(model_state)) 562 563 return is_solved_l 564 565 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 566 models_g: List[Model] = [] 567 568 models_s: List[Model] = self.state_to_model(states_goal) 569 keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal)) 570 for model_s, keep_prob in zip(models_s, keep_probs): 571 rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob) 572 models_g.append(frozenset(rand_subset)) 573 574 return self.model_to_goal(models_g) 575 576 @abstractmethod 577 def get_bk(self) -> List[str]: 578 """ get background, each element in list is a line 579 580 :return: 581 """ 582 pass 583 584 @abstractmethod 585 def get_ground_atoms(self) -> List[Atom]: 586 """ Get all possible ground atoms that can be used to make a state 587 588 :return: 589 """ 590 pass 591 592 @abstractmethod 593 def on_model(self, m: ModelCl) -> Model: 594 """ Process results from clingo 595 596 :param m: 597 :return: 598 """ 599 pass 600 601 @abstractmethod 602 def start_state_fixed(self, states: List[S]) -> List[Model]: 603 """ Given the start state, what must also be true for the goal state (i.e. immovable walls) 604 605 :param states: 606 :return: 607 """ 608 pass
Can sample goals from states
532 @abstractmethod 533 def model_to_state(self, models: List[Model]) -> List[S]: 534 """ Assumes model is a fully specified state 535 536 :param models: 537 :return: 538 """ 539 pass
Assumes model is a fully specified state
Parameters
- models:
Returns
549 def is_solved(self, states: List[S], goals: List[G]) -> List[bool]: 550 """ Returns whether or not state is solved 551 552 :param states: List of states 553 :param goals: List of goals 554 :return: Boolean numpy array where the element at index i corresponds to whether or not the 555 state at index i is solved 556 """ 557 models_g: List[Model] = self.goal_to_model(goals) 558 is_solved_l: List[bool] = [] 559 models_s: List[Model] = self.state_to_model(states) 560 for model_state, model_goal in zip(models_s, models_g): 561 is_solved_l.append(model_goal.issubset(model_state)) 562 563 return is_solved_l
Returns whether or not state is solved
Parameters
- states: List of states
- goals: List of goals
Returns
Boolean numpy array where the element at index i corresponds to whether or not the state at index i is solved
565 def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]: 566 models_g: List[Model] = [] 567 568 models_s: List[Model] = self.state_to_model(states_goal) 569 keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal)) 570 for model_s, keep_prob in zip(models_s, keep_probs): 571 rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob) 572 models_g.append(frozenset(rand_subset)) 573 574 return self.model_to_goal(models_g)
Given a state, return a goal that represents a set of goal states of which the given state is a member. Does not have to always return the same goal.
Parameters
- states_start: List of start states :param states_goal List of states from which goals will be sampled
Returns
Goals
576 @abstractmethod 577 def get_bk(self) -> List[str]: 578 """ get background, each element in list is a line 579 580 :return: 581 """ 582 pass
get background, each element in list is a line
Returns
584 @abstractmethod 585 def get_ground_atoms(self) -> List[Atom]: 586 """ Get all possible ground atoms that can be used to make a state 587 588 :return: 589 """ 590 pass
Get all possible ground atoms that can be used to make a state
Returns
592 @abstractmethod 593 def on_model(self, m: ModelCl) -> Model: 594 """ Process results from clingo 595 596 :param m: 597 :return: 598 """ 599 pass
Process results from clingo
Parameters
- m:
Returns
601 @abstractmethod 602 def start_state_fixed(self, states: List[S]) -> List[Model]: 603 """ Given the start state, what must also be true for the goal state (i.e. immovable walls) 604 605 :param states: 606 :return: 607 """ 608 pass
Given the start state, what must also be true for the goal state (i.e. immovable walls)
Parameters
- states: