deepxube.base.domain

  1from abc import ABC, abstractmethod
  2from typing import List, Tuple, Optional, Set, TypeVar, Generic, Dict, Any
  3import numpy as np
  4from clingo.solving import Model as ModelCl
  5
  6from deepxube.logic.logic_objects import Atom, Model
  7from deepxube.utils import misc_utils
  8from deepxube.nnet.nnet_utils import NNetPar, NNetCallable
  9from deepxube.utils.timing_utils import Times
 10from matplotlib.figure import Figure
 11import random
 12import time
 13from numpy.typing import NDArray
 14
 15
 16class State(ABC):
 17    """ State object
 18
 19    """
 20    @abstractmethod
 21    def __hash__(self) -> int:
 22        """ For use in CLOSED dictionary for pathfinding
 23        :return: hash value
 24        """
 25        pass
 26
 27    @abstractmethod
 28    def __eq__(self, other: object) -> bool:
 29        """ for use in state reidentification during pathfinding
 30
 31        :param other: other state
 32        :return: true if they are equal
 33        """
 34        pass
 35
 36
 37class Action(ABC):
 38    """ Action object
 39
 40    """
 41
 42    @abstractmethod
 43    def __hash__(self) -> int:
 44        """ For use in backup for Q* search
 45        :return: hash value
 46        """
 47        pass
 48
 49    @abstractmethod
 50    def __eq__(self, other: object) -> bool:
 51        """ for use in backup for Q* search
 52
 53        :param other: other state
 54        :return: true if they are equal
 55        """
 56        pass
 57
 58
 59class Goal(ABC):
 60    """ Goal object that represents a set of states considered goal states
 61
 62    """
 63    pass
 64
 65
 66S = TypeVar('S', bound=State)
 67A = TypeVar('A', bound=Action)
 68G = TypeVar('G', bound=Goal)
 69
 70
 71# TODO method for downloading data?
 72class Domain(ABC, Generic[S, A, G]):
 73    def __init__(self, *args: Any, **kwargs: Any) -> None:
 74        self.nnet_pars: List[Tuple[str, str, NNetPar]] = []
 75
 76    @abstractmethod
 77    def get_start_goal_pairs(self, num_steps_l: List[int],
 78                             times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
 79        """ Return start goal pairs with num_steps_l between start and goal
 80
 81        :param num_steps_l: Number of steps to take between start and goal
 82        :param times: Times that can be used to profile code
 83        :return: List of start states and list of goals
 84        """
 85        pass
 86
 87    @abstractmethod
 88    def get_state_action_rand(self, states: List[S]) -> List[A]:
 89        """ Get a random action that is applicable to the current state
 90
 91        :param states: List of states
 92        :return: List of random actions applicable to given states
 93        """
 94        pass
 95
 96    @abstractmethod
 97    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
 98        """ Get the next state and transition cost given the current state and action
 99
100        :param states: List of states
101        :param actions: List of actions to take
102        :return: Next states, transition costs
103        """
104        pass
105
106    @abstractmethod
107    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
108        """ Returns true if the state is a member of the set of goal states represented by the goal
109
110        :param states: List of states
111        :param goals: List of goals
112        :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states
113        represented by the goal at index i
114        """
115        pass
116
117    def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]:
118        """ Get random next state and transition cost given the current state
119
120        :param states: List of states
121        :return: Next states, transition costs
122        """
123        actions_rand: List[A] = self.get_state_action_rand(states)
124        return self.next_state(states, actions_rand)
125
126    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
127        """ Perform a random walk on the given states for the given number of steps
128
129        :param states: List of states
130        :param num_steps_l: number of steps to take for each state
131        :return: The resulting state and the path cost for each random walk
132        """
133        states_walk: List[S] = [state for state in states]
134        path_costs: List[float] = [0.0 for _ in states]
135
136        num_steps: NDArray[np.int_] = np.array(num_steps_l)
137        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
138        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
139        while np.any(steps_lt):
140            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
141            states_to_move = [states_walk[idx] for idx in idxs]
142
143            states_moved, tcs = self.next_state_rand(states_to_move)
144
145            idx: int
146            for move_idx, idx in enumerate(idxs):
147                states_walk[idx] = states_moved[move_idx]
148                path_costs[idx] += tcs[move_idx]
149
150            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
151
152            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
153
154        return states_walk, path_costs
155
156    def get_nnet_pars(self) -> List[Tuple[str, str, NNetPar]]:
157        return self.nnet_pars
158
159    def set_nnet_fns(self, nnet_fn_dict: Dict[str, NNetCallable]) -> None:
160        pass
161
162
163# Visualization Mixins
164class StateGoalVizable(Domain[S, A, G]):
165    """ Can visualize problem instances
166
167    """
168    @abstractmethod
169    def visualize_state_goal(self, state: S, goal: G, fig: Figure) -> None:
170        pass
171
172
173class StringToAct(Domain[S, A, G]):
174    """ Can get an action from a string. Used when visualizing problem instances.
175
176    """
177    @abstractmethod
178    def string_to_action(self, act_str: str) -> Optional[A]:
179        """
180        :param act_str: A string representation of an action
181        :return: The action represented by the string, if it is a valid representation, None otherwise
182        """
183        pass
184
185
186class ActsFixed(Domain[S, A, G]):
187    @abstractmethod
188    def get_action_rand(self, num: int) -> List[A]:
189        pass
190
191    def get_state_action_rand(self, states: List[S]) -> List[A]:
192        return self.get_action_rand(len(states))
193
194
195class ActsRev(Domain[S, A, G], ABC):
196    """ Actions are reversible.
197
198    """
199    @abstractmethod
200    def rev_action(self, actions: List[A]) -> List[A]:
201        """ Get the reverse of the given action
202
203        :param actions: List of actions
204        :return: Reverse of given action
205        """
206        pass
207
208    @abstractmethod
209    def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
210        """ Transition along the directed edge in the reverse direction.
211
212        :param states: List of states
213        :param actions: List of actions to take
214        :return: Reverse states, transition costs which are weights of edges taken in reverse
215        """
216        pass
217
218
219class ActsEnum(Domain[S, A, G]):
220    @abstractmethod
221    def get_state_actions(self, states: List[S]) -> List[List[A]]:
222        """ Get actions applicable to each states
223
224        :param states: List of states
225        :return: Applicable actions
226        """
227        pass
228
229    def get_state_action_rand(self, states: List[S]) -> List[A]:
230        state_actions_l: List[List[A]] = self.get_state_actions(states)
231        return [random.choice(state_actions) for state_actions in state_actions_l]
232
233    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
234        """ Generate all children for the state, assumes there is at least one child state
235        :param states: List of states
236        :return: Children of each state, actions, transition costs for each state
237        """
238        # TODO further validate
239        # initialize
240        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
241        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
242        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
243        state_actions: List[List[A]] = self.get_state_actions(states)
244
245        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
246        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
247        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
248
249        # for each move, get next states, transition costs, and if solved
250        while np.any(actions_lt):
251            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
252            states_idxs: List[S] = [states[idx] for idx in idxs]
253            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
254
255            # next state
256            states_next, tcs_move = self.next_state(states_idxs, actions_idxs)
257
258            # append
259            idx: int
260            for exp_idx, idx in enumerate(idxs):
261                states_exp_l[idx].append(states_next[exp_idx])
262                actions_exp_l[idx].append(actions_idxs[exp_idx])
263                tcs_l[idx].append(tcs_move[exp_idx])
264
265            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
266            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
267
268        return states_exp_l, actions_exp_l, tcs_l
269
270
271class ActsEnumFixed(ActsEnum[S, A, G], ActsFixed[S, A, G]):
272    def get_action_rand(self, num: int) -> List[A]:
273        actions_fixed: List[A] = self.get_actions_fixed()
274        return [random.choice(actions_fixed) for _ in range(num)]
275
276    def get_state_actions(self, states: List[S]) -> List[List[A]]:
277        return [self.get_actions_fixed().copy() for _ in range(len(states))]
278
279    @abstractmethod
280    def get_actions_fixed(self) -> List[A]:
281        pass
282
283    def get_num_acts(self) -> int:
284        return len(self.get_actions_fixed())
285
286
287# Goal mixins
288class GoalSampleable(Domain[S, A, G]):
289    """ Can sample goals from states"""
290    @abstractmethod
291    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
292        """ Given a state, return a goal that represents a set of goal states of which the given state is a member.
293        Does not have to always return the same goal.
294
295        :param states_start: List of start states
296        :param states_goal List of states from which goals will be sampled
297        :return: Goals
298        """
299        pass
300
301
302class GoalStateSampleable(Domain[S, A, G]):
303    """ Can sample states from goals """
304    @abstractmethod
305    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
306        """ Given a goal, sample states that are members of that goal.
307
308        :param goals: List of goals
309        :param num_states_l: List of integers representing how many states to sample for the corresponding goal
310        :return: List of list of states, where each element is a list of states sampled for the corresponding goal
311        """
312        pass
313
314
315class GoalFixed(Domain[S, A, G]):
316    """ Goal is the same for all problem instances """
317    @abstractmethod
318    def get_goal(self) -> G:
319        """
320        :return: Fixed goal
321        """
322        pass
323
324
325class StartGoalWalkable(GoalSampleable[S, A, G]):
326    """ Can sample start states, take actions to obtain another state, and sample a goal from that state"""
327    @abstractmethod
328    def get_start_states(self, num_states: int) -> List[S]:
329        """ A method for generating start states. Should try to make this generate states that are as diverse as
330        possible so that the trained heuristic function generalizes well.
331
332        :param num_states: Number of states to get
333        :return: Generated states
334        """
335        pass
336
337    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
338        # Initialize
339        if times is None:
340            times = Times()
341
342        # Start states
343        start_time = time.time()
344        states_start: List[S] = self.get_start_states(len(num_steps_l))
345        times.record_time("get_start_states", time.time() - start_time)
346
347        # random walk
348        start_time = time.time()
349        states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0]
350        times.record_time("random_walk", time.time() - start_time)
351
352        # state to goal
353        start_time = time.time()
354        goals: List[G] = self.sample_goal(states_start, states_goal)
355        times.record_time("sample_goal", time.time() - start_time)
356
357        return states_start, goals
358
359
360class GoalStateSampleableFixed(GoalStateSampleable[S, A, G], GoalFixed[S, A, G]):
361    """ Can sample states from goal, which is the same for all problem instances """
362
363    @abstractmethod
364    def sample_goal_states_fixed(self, num_states: int) -> List[S]:
365        pass
366
367    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
368        return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l]
369
370
371# reverse walks
372class FixedGoalRevWalk(GoalStateSampleableFixed[S, A, G]):
373    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
374        # Initialize
375        if times is None:
376            times = Times()
377
378        # Start states
379        start_time = time.time()
380        states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l))
381        times.record_time("get_start_states", time.time() - start_time)
382
383        # random walk
384        start_time = time.time()
385        states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l)
386        times.record_time("random_walk", time.time() - start_time)
387
388        # state to goal
389        start_time = time.time()
390        goals: List[G] = [self.get_goal()] * len(states_start)
391        times.record_time("sample_goal", time.time() - start_time)
392
393        return states_start, goals
394
395    @abstractmethod
396    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
397        pass
398
399
400class FixedGoalRevWalkActsRev(FixedGoalRevWalk[S, A, G], ActsRev[S, A, G], ABC):
401    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
402        return self.random_walk(states, num_steps_l)[0]
403
404
405# numpy convenience mixins
406class NextStateNP(Domain[S, A, G]):
407    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
408        states_np: List[NDArray] = self._states_to_np(states)
409        states_next_np, tcs = self._next_state_np(states_np, actions)
410        states_next: List[S] = self._np_to_states(states_next_np)
411
412        return states_next, tcs
413
414    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
415        states_np = self._states_to_np(states)
416        path_costs: List[float] = [0.0 for _ in states]
417
418        num_steps: NDArray[np.int_] = np.array(num_steps_l)
419        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
420        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
421        while np.any(steps_lt):
422            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
423            states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
424            actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove)
425
426            states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand)
427
428            for l_idx in range(len(states_np)):
429                states_np[l_idx][idxs] = states_moved[l_idx]
430            idx: int
431            for act_idx, idx in enumerate(idxs):
432                path_costs[idx] += tcs[act_idx]
433
434            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
435
436            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
437
438        return self._np_to_states(states_np), path_costs
439
440    @abstractmethod
441    def _states_to_np(self, states: List[S]) -> List[NDArray]:
442        pass
443
444    @abstractmethod
445    def _np_to_states(self, states_np_l: List[NDArray]) -> List[S]:
446        pass
447
448    @abstractmethod
449    def _get_state_np_actions(self, states_np_l: List[NDArray]) -> List[List[A]]:
450        pass
451
452    def _get_state_np_action_rand(self, states_np: List[NDArray]) -> List[A]:
453        state_actions_l: List[List[A]] = self._get_state_np_actions(states_np)
454        return [random.choice(state_actions) for state_actions in state_actions_l]
455
456    @abstractmethod
457    def _next_state_np(self, states_np: List[NDArray], actions: List[A]) -> Tuple[List[NDArray], List[float]]:
458        """ Get the next state and transition cost given the current numpy representations of the state and action
459
460
461        @param states_np: numpy representation of states. Each row in each element of states_np list represents
462        information for a different state. There can be one or more multiple elements in the list for each state.
463        This object should not be mutated.
464        @param actions: actions
465        @return: Numpy representation of next states, transition costs
466        """
467        pass
468
469
470class NextStateNPActsEnum(NextStateNP[S, A, G], ActsEnum[S, A, G], ABC):
471    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
472        # initialize
473        states_np: List[NDArray] = self._states_to_np(states)
474        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
475        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
476        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
477        state_actions: List[List[A]] = self.get_state_actions(states)
478
479        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
480        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
481        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
482
483        # for each move, get next states, transition costs, and if solved
484        while np.any(actions_lt):
485            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
486            states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
487            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
488
489            # next state
490            states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs)
491            states_next: List[S] = self._np_to_states(states_next_np)
492
493            # append
494            idx: int
495            for exp_idx, idx in enumerate(idxs):
496                states_exp_l[idx].append(states_next[exp_idx])
497                actions_exp_l[idx].append(actions_idxs[exp_idx])
498                tcs_l[idx].append(tcs_move[exp_idx])
499
500            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
501            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
502
503        return states_exp_l, actions_exp_l, tcs_l
504
505
506class NextStateNPActsEnumFixed(NextStateNPActsEnum[S, A, G], ActsEnumFixed[S, A, G], ABC):
507    def _get_state_np_actions(self, states_np: List[NDArray]) -> List[List[A]]:
508        state_actions: List[A] = self.get_actions_fixed()
509        return [state_actions.copy() for _ in range(states_np[0].shape[0])]
510
511
512class SupportsPDDL(Domain[S, A, G], ABC):
513    @abstractmethod
514    def get_pddl_domain(self) -> List[str]:
515        pass
516
517    @abstractmethod
518    def state_goal_to_pddl_inst(self, state: S, goal: G) -> List[str]:
519        pass
520
521    @abstractmethod
522    def pddl_action_to_action(self, pddl_action: str) -> A:
523        pass
524
525
526class GoalGrndAtoms(GoalSampleable[S, A, G]):
527    @abstractmethod
528    def state_to_model(self, states: List[S]) -> List[Model]:
529        pass
530
531    @abstractmethod
532    def model_to_state(self, models: List[Model]) -> List[S]:
533        """ Assumes model is a fully specified state
534
535        :param models:
536        :return:
537        """
538        pass
539
540    @abstractmethod
541    def goal_to_model(self, goals: List[G]) -> List[Model]:
542        pass
543
544    @abstractmethod
545    def model_to_goal(self, models: List[Model]) -> List[G]:
546        pass
547
548    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
549        """ Returns whether or not state is solved
550
551        :param states: List of states
552        :param goals: List of goals
553        :return: Boolean numpy array where the element at index i corresponds to whether or not the
554        state at index i is solved
555        """
556        models_g: List[Model] = self.goal_to_model(goals)
557        is_solved_l: List[bool] = []
558        models_s: List[Model] = self.state_to_model(states)
559        for model_state, model_goal in zip(models_s, models_g):
560            is_solved_l.append(model_goal.issubset(model_state))
561
562        return is_solved_l
563
564    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
565        models_g: List[Model] = []
566
567        models_s: List[Model] = self.state_to_model(states_goal)
568        keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal))
569        for model_s, keep_prob in zip(models_s, keep_probs):
570            rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob)
571            models_g.append(frozenset(rand_subset))
572
573        return self.model_to_goal(models_g)
574
575    @abstractmethod
576    def get_bk(self) -> List[str]:
577        """ get background, each element in list is a line
578
579        :return:
580        """
581        pass
582
583    @abstractmethod
584    def get_ground_atoms(self) -> List[Atom]:
585        """ Get all possible ground atoms that can be used to make a state
586
587        :return:
588        """
589        pass
590
591    @abstractmethod
592    def on_model(self, m: ModelCl) -> Model:
593        """ Process results from clingo
594
595        :param m:
596        :return:
597        """
598        pass
599
600    @abstractmethod
601    def start_state_fixed(self, states: List[S]) -> List[Model]:
602        """ Given the start state, what must also be true for the goal state (i.e. immovable walls)
603
604        :param states:
605        :return:
606        """
607        pass
class State(abc.ABC):
17class State(ABC):
18    """ State object
19
20    """
21    @abstractmethod
22    def __hash__(self) -> int:
23        """ For use in CLOSED dictionary for pathfinding
24        :return: hash value
25        """
26        pass
27
28    @abstractmethod
29    def __eq__(self, other: object) -> bool:
30        """ for use in state reidentification during pathfinding
31
32        :param other: other state
33        :return: true if they are equal
34        """
35        pass

State object

class Action(abc.ABC):
38class Action(ABC):
39    """ Action object
40
41    """
42
43    @abstractmethod
44    def __hash__(self) -> int:
45        """ For use in backup for Q* search
46        :return: hash value
47        """
48        pass
49
50    @abstractmethod
51    def __eq__(self, other: object) -> bool:
52        """ for use in backup for Q* search
53
54        :param other: other state
55        :return: true if they are equal
56        """
57        pass

Action object

class Goal(abc.ABC):
60class Goal(ABC):
61    """ Goal object that represents a set of states considered goal states
62
63    """
64    pass

Goal object that represents a set of states considered goal states

class Domain(abc.ABC, typing.Generic[~S, ~A, ~G]):
 73class Domain(ABC, Generic[S, A, G]):
 74    def __init__(self, *args: Any, **kwargs: Any) -> None:
 75        self.nnet_pars: List[Tuple[str, str, NNetPar]] = []
 76
 77    @abstractmethod
 78    def get_start_goal_pairs(self, num_steps_l: List[int],
 79                             times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
 80        """ Return start goal pairs with num_steps_l between start and goal
 81
 82        :param num_steps_l: Number of steps to take between start and goal
 83        :param times: Times that can be used to profile code
 84        :return: List of start states and list of goals
 85        """
 86        pass
 87
 88    @abstractmethod
 89    def get_state_action_rand(self, states: List[S]) -> List[A]:
 90        """ Get a random action that is applicable to the current state
 91
 92        :param states: List of states
 93        :return: List of random actions applicable to given states
 94        """
 95        pass
 96
 97    @abstractmethod
 98    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
 99        """ Get the next state and transition cost given the current state and action
100
101        :param states: List of states
102        :param actions: List of actions to take
103        :return: Next states, transition costs
104        """
105        pass
106
107    @abstractmethod
108    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
109        """ Returns true if the state is a member of the set of goal states represented by the goal
110
111        :param states: List of states
112        :param goals: List of goals
113        :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states
114        represented by the goal at index i
115        """
116        pass
117
118    def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]:
119        """ Get random next state and transition cost given the current state
120
121        :param states: List of states
122        :return: Next states, transition costs
123        """
124        actions_rand: List[A] = self.get_state_action_rand(states)
125        return self.next_state(states, actions_rand)
126
127    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
128        """ Perform a random walk on the given states for the given number of steps
129
130        :param states: List of states
131        :param num_steps_l: number of steps to take for each state
132        :return: The resulting state and the path cost for each random walk
133        """
134        states_walk: List[S] = [state for state in states]
135        path_costs: List[float] = [0.0 for _ in states]
136
137        num_steps: NDArray[np.int_] = np.array(num_steps_l)
138        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
139        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
140        while np.any(steps_lt):
141            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
142            states_to_move = [states_walk[idx] for idx in idxs]
143
144            states_moved, tcs = self.next_state_rand(states_to_move)
145
146            idx: int
147            for move_idx, idx in enumerate(idxs):
148                states_walk[idx] = states_moved[move_idx]
149                path_costs[idx] += tcs[move_idx]
150
151            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
152
153            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
154
155        return states_walk, path_costs
156
157    def get_nnet_pars(self) -> List[Tuple[str, str, NNetPar]]:
158        return self.nnet_pars
159
160    def set_nnet_fns(self, nnet_fn_dict: Dict[str, NNetCallable]) -> None:
161        pass

Helper class that provides a standard way to create an ABC using inheritance.

nnet_pars: List[Tuple[str, str, deepxube.nnet.nnet_utils.NNetPar]]
@abstractmethod
def get_start_goal_pairs( self, num_steps_l: List[int], times: Optional[deepxube.utils.timing_utils.Times] = None) -> Tuple[List[~S], List[~G]]:
77    @abstractmethod
78    def get_start_goal_pairs(self, num_steps_l: List[int],
79                             times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
80        """ Return start goal pairs with num_steps_l between start and goal
81
82        :param num_steps_l: Number of steps to take between start and goal
83        :param times: Times that can be used to profile code
84        :return: List of start states and list of goals
85        """
86        pass

Return start goal pairs with num_steps_l between start and goal

Parameters
  • num_steps_l: Number of steps to take between start and goal
  • times: Times that can be used to profile code
Returns

List of start states and list of goals

@abstractmethod
def get_state_action_rand(self, states: List[~S]) -> List[~A]:
88    @abstractmethod
89    def get_state_action_rand(self, states: List[S]) -> List[A]:
90        """ Get a random action that is applicable to the current state
91
92        :param states: List of states
93        :return: List of random actions applicable to given states
94        """
95        pass

Get a random action that is applicable to the current state

Parameters
  • states: List of states
Returns

List of random actions applicable to given states

@abstractmethod
def next_state( self, states: List[~S], actions: List[~A]) -> Tuple[List[~S], List[float]]:
 97    @abstractmethod
 98    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
 99        """ Get the next state and transition cost given the current state and action
100
101        :param states: List of states
102        :param actions: List of actions to take
103        :return: Next states, transition costs
104        """
105        pass

Get the next state and transition cost given the current state and action

Parameters
  • states: List of states
  • actions: List of actions to take
Returns

Next states, transition costs

@abstractmethod
def is_solved(self, states: List[~S], goals: List[~G]) -> List[bool]:
107    @abstractmethod
108    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
109        """ Returns true if the state is a member of the set of goal states represented by the goal
110
111        :param states: List of states
112        :param goals: List of goals
113        :return: List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states
114        represented by the goal at index i
115        """
116        pass

Returns true if the state is a member of the set of goal states represented by the goal

Parameters
  • states: List of states
  • goals: List of goals
Returns

List of booleans where the element at index i corresponds to whether or not the state at index i is a member of the set of goal states represented by the goal at index i

def next_state_rand(self, states: List[~S]) -> Tuple[List[~S], List[float]]:
118    def next_state_rand(self, states: List[S]) -> Tuple[List[S], List[float]]:
119        """ Get random next state and transition cost given the current state
120
121        :param states: List of states
122        :return: Next states, transition costs
123        """
124        actions_rand: List[A] = self.get_state_action_rand(states)
125        return self.next_state(states, actions_rand)

Get random next state and transition cost given the current state

Parameters
  • states: List of states
Returns

Next states, transition costs

def random_walk( self, states: List[~S], num_steps_l: List[int]) -> Tuple[List[~S], List[float]]:
127    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
128        """ Perform a random walk on the given states for the given number of steps
129
130        :param states: List of states
131        :param num_steps_l: number of steps to take for each state
132        :return: The resulting state and the path cost for each random walk
133        """
134        states_walk: List[S] = [state for state in states]
135        path_costs: List[float] = [0.0 for _ in states]
136
137        num_steps: NDArray[np.int_] = np.array(num_steps_l)
138        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
139        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
140        while np.any(steps_lt):
141            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
142            states_to_move = [states_walk[idx] for idx in idxs]
143
144            states_moved, tcs = self.next_state_rand(states_to_move)
145
146            idx: int
147            for move_idx, idx in enumerate(idxs):
148                states_walk[idx] = states_moved[move_idx]
149                path_costs[idx] += tcs[move_idx]
150
151            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
152
153            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
154
155        return states_walk, path_costs

Perform a random walk on the given states for the given number of steps

Parameters
  • states: List of states
  • num_steps_l: number of steps to take for each state
Returns

The resulting state and the path cost for each random walk

def get_nnet_pars(self) -> List[Tuple[str, str, deepxube.nnet.nnet_utils.NNetPar]]:
157    def get_nnet_pars(self) -> List[Tuple[str, str, NNetPar]]:
158        return self.nnet_pars
def set_nnet_fns(self, nnet_fn_dict: Dict[str, Callable[..., Any]]) -> None:
160    def set_nnet_fns(self, nnet_fn_dict: Dict[str, NNetCallable]) -> None:
161        pass
class StateGoalVizable(deepxube.base.domain.Domain[~S, ~A, ~G]):
165class StateGoalVizable(Domain[S, A, G]):
166    """ Can visualize problem instances
167
168    """
169    @abstractmethod
170    def visualize_state_goal(self, state: S, goal: G, fig: Figure) -> None:
171        pass

Can visualize problem instances

@abstractmethod
def visualize_state_goal(self, state: ~S, goal: ~G, fig: matplotlib.figure.Figure) -> None:
169    @abstractmethod
170    def visualize_state_goal(self, state: S, goal: G, fig: Figure) -> None:
171        pass
class StringToAct(deepxube.base.domain.Domain[~S, ~A, ~G]):
174class StringToAct(Domain[S, A, G]):
175    """ Can get an action from a string. Used when visualizing problem instances.
176
177    """
178    @abstractmethod
179    def string_to_action(self, act_str: str) -> Optional[A]:
180        """
181        :param act_str: A string representation of an action
182        :return: The action represented by the string, if it is a valid representation, None otherwise
183        """
184        pass

Can get an action from a string. Used when visualizing problem instances.

@abstractmethod
def string_to_action(self, act_str: str) -> Optional[~A]:
178    @abstractmethod
179    def string_to_action(self, act_str: str) -> Optional[A]:
180        """
181        :param act_str: A string representation of an action
182        :return: The action represented by the string, if it is a valid representation, None otherwise
183        """
184        pass
Parameters
  • act_str: A string representation of an action
Returns

The action represented by the string, if it is a valid representation, None otherwise

class ActsFixed(deepxube.base.domain.Domain[~S, ~A, ~G]):
187class ActsFixed(Domain[S, A, G]):
188    @abstractmethod
189    def get_action_rand(self, num: int) -> List[A]:
190        pass
191
192    def get_state_action_rand(self, states: List[S]) -> List[A]:
193        return self.get_action_rand(len(states))

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def get_action_rand(self, num: int) -> List[~A]:
188    @abstractmethod
189    def get_action_rand(self, num: int) -> List[A]:
190        pass
def get_state_action_rand(self, states: List[~S]) -> List[~A]:
192    def get_state_action_rand(self, states: List[S]) -> List[A]:
193        return self.get_action_rand(len(states))

Get a random action that is applicable to the current state

Parameters
  • states: List of states
Returns

List of random actions applicable to given states

class ActsRev(deepxube.base.domain.Domain[~S, ~A, ~G], abc.ABC):
196class ActsRev(Domain[S, A, G], ABC):
197    """ Actions are reversible.
198
199    """
200    @abstractmethod
201    def rev_action(self, actions: List[A]) -> List[A]:
202        """ Get the reverse of the given action
203
204        :param actions: List of actions
205        :return: Reverse of given action
206        """
207        pass
208
209    @abstractmethod
210    def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
211        """ Transition along the directed edge in the reverse direction.
212
213        :param states: List of states
214        :param actions: List of actions to take
215        :return: Reverse states, transition costs which are weights of edges taken in reverse
216        """
217        pass

Actions are reversible.

@abstractmethod
def rev_action(self, actions: List[~A]) -> List[~A]:
200    @abstractmethod
201    def rev_action(self, actions: List[A]) -> List[A]:
202        """ Get the reverse of the given action
203
204        :param actions: List of actions
205        :return: Reverse of given action
206        """
207        pass

Get the reverse of the given action

Parameters
  • actions: List of actions
Returns

Reverse of given action

@abstractmethod
def rev_state( self, states: List[~S], actions: List[~A]) -> Tuple[List[~S], List[float]]:
209    @abstractmethod
210    def rev_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
211        """ Transition along the directed edge in the reverse direction.
212
213        :param states: List of states
214        :param actions: List of actions to take
215        :return: Reverse states, transition costs which are weights of edges taken in reverse
216        """
217        pass

Transition along the directed edge in the reverse direction.

Parameters
  • states: List of states
  • actions: List of actions to take
Returns

Reverse states, transition costs which are weights of edges taken in reverse

class ActsEnum(deepxube.base.domain.Domain[~S, ~A, ~G]):
220class ActsEnum(Domain[S, A, G]):
221    @abstractmethod
222    def get_state_actions(self, states: List[S]) -> List[List[A]]:
223        """ Get actions applicable to each states
224
225        :param states: List of states
226        :return: Applicable actions
227        """
228        pass
229
230    def get_state_action_rand(self, states: List[S]) -> List[A]:
231        state_actions_l: List[List[A]] = self.get_state_actions(states)
232        return [random.choice(state_actions) for state_actions in state_actions_l]
233
234    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
235        """ Generate all children for the state, assumes there is at least one child state
236        :param states: List of states
237        :return: Children of each state, actions, transition costs for each state
238        """
239        # TODO further validate
240        # initialize
241        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
242        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
243        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
244        state_actions: List[List[A]] = self.get_state_actions(states)
245
246        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
247        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
248        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
249
250        # for each move, get next states, transition costs, and if solved
251        while np.any(actions_lt):
252            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
253            states_idxs: List[S] = [states[idx] for idx in idxs]
254            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
255
256            # next state
257            states_next, tcs_move = self.next_state(states_idxs, actions_idxs)
258
259            # append
260            idx: int
261            for exp_idx, idx in enumerate(idxs):
262                states_exp_l[idx].append(states_next[exp_idx])
263                actions_exp_l[idx].append(actions_idxs[exp_idx])
264                tcs_l[idx].append(tcs_move[exp_idx])
265
266            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
267            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
268
269        return states_exp_l, actions_exp_l, tcs_l

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def get_state_actions(self, states: List[~S]) -> List[List[~A]]:
221    @abstractmethod
222    def get_state_actions(self, states: List[S]) -> List[List[A]]:
223        """ Get actions applicable to each states
224
225        :param states: List of states
226        :return: Applicable actions
227        """
228        pass

Get actions applicable to each states

Parameters
  • states: List of states
Returns

Applicable actions

def get_state_action_rand(self, states: List[~S]) -> List[~A]:
230    def get_state_action_rand(self, states: List[S]) -> List[A]:
231        state_actions_l: List[List[A]] = self.get_state_actions(states)
232        return [random.choice(state_actions) for state_actions in state_actions_l]

Get a random action that is applicable to the current state

Parameters
  • states: List of states
Returns

List of random actions applicable to given states

def expand( self, states: List[~S]) -> Tuple[List[List[~S]], List[List[~A]], List[List[float]]]:
234    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
235        """ Generate all children for the state, assumes there is at least one child state
236        :param states: List of states
237        :return: Children of each state, actions, transition costs for each state
238        """
239        # TODO further validate
240        # initialize
241        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
242        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
243        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
244        state_actions: List[List[A]] = self.get_state_actions(states)
245
246        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
247        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
248        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
249
250        # for each move, get next states, transition costs, and if solved
251        while np.any(actions_lt):
252            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
253            states_idxs: List[S] = [states[idx] for idx in idxs]
254            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
255
256            # next state
257            states_next, tcs_move = self.next_state(states_idxs, actions_idxs)
258
259            # append
260            idx: int
261            for exp_idx, idx in enumerate(idxs):
262                states_exp_l[idx].append(states_next[exp_idx])
263                actions_exp_l[idx].append(actions_idxs[exp_idx])
264                tcs_l[idx].append(tcs_move[exp_idx])
265
266            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
267            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
268
269        return states_exp_l, actions_exp_l, tcs_l

Generate all children for the state, assumes there is at least one child state

Parameters
  • states: List of states
Returns

Children of each state, actions, transition costs for each state

272class ActsEnumFixed(ActsEnum[S, A, G], ActsFixed[S, A, G]):
273    def get_action_rand(self, num: int) -> List[A]:
274        actions_fixed: List[A] = self.get_actions_fixed()
275        return [random.choice(actions_fixed) for _ in range(num)]
276
277    def get_state_actions(self, states: List[S]) -> List[List[A]]:
278        return [self.get_actions_fixed().copy() for _ in range(len(states))]
279
280    @abstractmethod
281    def get_actions_fixed(self) -> List[A]:
282        pass
283
284    def get_num_acts(self) -> int:
285        return len(self.get_actions_fixed())

Helper class that provides a standard way to create an ABC using inheritance.

def get_action_rand(self, num: int) -> List[~A]:
273    def get_action_rand(self, num: int) -> List[A]:
274        actions_fixed: List[A] = self.get_actions_fixed()
275        return [random.choice(actions_fixed) for _ in range(num)]
def get_state_actions(self, states: List[~S]) -> List[List[~A]]:
277    def get_state_actions(self, states: List[S]) -> List[List[A]]:
278        return [self.get_actions_fixed().copy() for _ in range(len(states))]

Get actions applicable to each states

Parameters
  • states: List of states
Returns

Applicable actions

@abstractmethod
def get_actions_fixed(self) -> List[~A]:
280    @abstractmethod
281    def get_actions_fixed(self) -> List[A]:
282        pass
def get_num_acts(self) -> int:
284    def get_num_acts(self) -> int:
285        return len(self.get_actions_fixed())
class GoalSampleable(deepxube.base.domain.Domain[~S, ~A, ~G]):
289class GoalSampleable(Domain[S, A, G]):
290    """ Can sample goals from states"""
291    @abstractmethod
292    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
293        """ Given a state, return a goal that represents a set of goal states of which the given state is a member.
294        Does not have to always return the same goal.
295
296        :param states_start: List of start states
297        :param states_goal List of states from which goals will be sampled
298        :return: Goals
299        """
300        pass

Can sample goals from states

@abstractmethod
def sample_goal(self, states_start: List[~S], states_goal: List[~S]) -> List[~G]:
291    @abstractmethod
292    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
293        """ Given a state, return a goal that represents a set of goal states of which the given state is a member.
294        Does not have to always return the same goal.
295
296        :param states_start: List of start states
297        :param states_goal List of states from which goals will be sampled
298        :return: Goals
299        """
300        pass

Given a state, return a goal that represents a set of goal states of which the given state is a member. Does not have to always return the same goal.

Parameters
  • states_start: List of start states :param states_goal List of states from which goals will be sampled
Returns

Goals

class GoalStateSampleable(deepxube.base.domain.Domain[~S, ~A, ~G]):
303class GoalStateSampleable(Domain[S, A, G]):
304    """ Can sample states from goals """
305    @abstractmethod
306    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
307        """ Given a goal, sample states that are members of that goal.
308
309        :param goals: List of goals
310        :param num_states_l: List of integers representing how many states to sample for the corresponding goal
311        :return: List of list of states, where each element is a list of states sampled for the corresponding goal
312        """
313        pass

Can sample states from goals

@abstractmethod
def sample_goal_states(self, goals: List[~G], num_states_l: List[int]) -> List[List[~S]]:
305    @abstractmethod
306    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
307        """ Given a goal, sample states that are members of that goal.
308
309        :param goals: List of goals
310        :param num_states_l: List of integers representing how many states to sample for the corresponding goal
311        :return: List of list of states, where each element is a list of states sampled for the corresponding goal
312        """
313        pass

Given a goal, sample states that are members of that goal.

Parameters
  • goals: List of goals
  • num_states_l: List of integers representing how many states to sample for the corresponding goal
Returns

List of list of states, where each element is a list of states sampled for the corresponding goal

class GoalFixed(deepxube.base.domain.Domain[~S, ~A, ~G]):
316class GoalFixed(Domain[S, A, G]):
317    """ Goal is the same for all problem instances """
318    @abstractmethod
319    def get_goal(self) -> G:
320        """
321        :return: Fixed goal
322        """
323        pass

Goal is the same for all problem instances

@abstractmethod
def get_goal(self) -> ~G:
318    @abstractmethod
319    def get_goal(self) -> G:
320        """
321        :return: Fixed goal
322        """
323        pass
Returns

Fixed goal

class StartGoalWalkable(deepxube.base.domain.GoalSampleable[~S, ~A, ~G]):
326class StartGoalWalkable(GoalSampleable[S, A, G]):
327    """ Can sample start states, take actions to obtain another state, and sample a goal from that state"""
328    @abstractmethod
329    def get_start_states(self, num_states: int) -> List[S]:
330        """ A method for generating start states. Should try to make this generate states that are as diverse as
331        possible so that the trained heuristic function generalizes well.
332
333        :param num_states: Number of states to get
334        :return: Generated states
335        """
336        pass
337
338    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
339        # Initialize
340        if times is None:
341            times = Times()
342
343        # Start states
344        start_time = time.time()
345        states_start: List[S] = self.get_start_states(len(num_steps_l))
346        times.record_time("get_start_states", time.time() - start_time)
347
348        # random walk
349        start_time = time.time()
350        states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0]
351        times.record_time("random_walk", time.time() - start_time)
352
353        # state to goal
354        start_time = time.time()
355        goals: List[G] = self.sample_goal(states_start, states_goal)
356        times.record_time("sample_goal", time.time() - start_time)
357
358        return states_start, goals

Can sample start states, take actions to obtain another state, and sample a goal from that state

@abstractmethod
def get_start_states(self, num_states: int) -> List[~S]:
328    @abstractmethod
329    def get_start_states(self, num_states: int) -> List[S]:
330        """ A method for generating start states. Should try to make this generate states that are as diverse as
331        possible so that the trained heuristic function generalizes well.
332
333        :param num_states: Number of states to get
334        :return: Generated states
335        """
336        pass

A method for generating start states. Should try to make this generate states that are as diverse as possible so that the trained heuristic function generalizes well.

Parameters
  • num_states: Number of states to get
Returns

Generated states

def get_start_goal_pairs( self, num_steps_l: List[int], times: Optional[deepxube.utils.timing_utils.Times] = None) -> Tuple[List[~S], List[~G]]:
338    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
339        # Initialize
340        if times is None:
341            times = Times()
342
343        # Start states
344        start_time = time.time()
345        states_start: List[S] = self.get_start_states(len(num_steps_l))
346        times.record_time("get_start_states", time.time() - start_time)
347
348        # random walk
349        start_time = time.time()
350        states_goal: List[S] = self.random_walk(states_start, num_steps_l)[0]
351        times.record_time("random_walk", time.time() - start_time)
352
353        # state to goal
354        start_time = time.time()
355        goals: List[G] = self.sample_goal(states_start, states_goal)
356        times.record_time("sample_goal", time.time() - start_time)
357
358        return states_start, goals

Return start goal pairs with num_steps_l between start and goal

Parameters
  • num_steps_l: Number of steps to take between start and goal
  • times: Times that can be used to profile code
Returns

List of start states and list of goals

361class GoalStateSampleableFixed(GoalStateSampleable[S, A, G], GoalFixed[S, A, G]):
362    """ Can sample states from goal, which is the same for all problem instances """
363
364    @abstractmethod
365    def sample_goal_states_fixed(self, num_states: int) -> List[S]:
366        pass
367
368    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
369        return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l]

Can sample states from goal, which is the same for all problem instances

@abstractmethod
def sample_goal_states_fixed(self, num_states: int) -> List[~S]:
364    @abstractmethod
365    def sample_goal_states_fixed(self, num_states: int) -> List[S]:
366        pass
def sample_goal_states(self, goals: List[~G], num_states_l: List[int]) -> List[List[~S]]:
368    def sample_goal_states(self, goals: List[G], num_states_l: List[int]) -> List[List[S]]:
369        return [self.sample_goal_states_fixed(num_states) for num_states in num_states_l]

Given a goal, sample states that are members of that goal.

Parameters
  • goals: List of goals
  • num_states_l: List of integers representing how many states to sample for the corresponding goal
Returns

List of list of states, where each element is a list of states sampled for the corresponding goal

class FixedGoalRevWalk(deepxube.base.domain.GoalStateSampleableFixed[~S, ~A, ~G]):
373class FixedGoalRevWalk(GoalStateSampleableFixed[S, A, G]):
374    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
375        # Initialize
376        if times is None:
377            times = Times()
378
379        # Start states
380        start_time = time.time()
381        states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l))
382        times.record_time("get_start_states", time.time() - start_time)
383
384        # random walk
385        start_time = time.time()
386        states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l)
387        times.record_time("random_walk", time.time() - start_time)
388
389        # state to goal
390        start_time = time.time()
391        goals: List[G] = [self.get_goal()] * len(states_start)
392        times.record_time("sample_goal", time.time() - start_time)
393
394        return states_start, goals
395
396    @abstractmethod
397    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
398        pass

Can sample states from goal, which is the same for all problem instances

def get_start_goal_pairs( self, num_steps_l: List[int], times: Optional[deepxube.utils.timing_utils.Times] = None) -> Tuple[List[~S], List[~G]]:
374    def get_start_goal_pairs(self, num_steps_l: List[int], times: Optional[Times] = None) -> Tuple[List[S], List[G]]:
375        # Initialize
376        if times is None:
377            times = Times()
378
379        # Start states
380        start_time = time.time()
381        states_goal: List[S] = self.sample_goal_states_fixed(len(num_steps_l))
382        times.record_time("get_start_states", time.time() - start_time)
383
384        # random walk
385        start_time = time.time()
386        states_start: List[S] = self.random_walk_rev(states_goal, num_steps_l)
387        times.record_time("random_walk", time.time() - start_time)
388
389        # state to goal
390        start_time = time.time()
391        goals: List[G] = [self.get_goal()] * len(states_start)
392        times.record_time("sample_goal", time.time() - start_time)
393
394        return states_start, goals

Return start goal pairs with num_steps_l between start and goal

Parameters
  • num_steps_l: Number of steps to take between start and goal
  • times: Times that can be used to profile code
Returns

List of start states and list of goals

@abstractmethod
def random_walk_rev(self, states: List[~S], num_steps_l: List[int]) -> List[~S]:
396    @abstractmethod
397    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
398        pass
401class FixedGoalRevWalkActsRev(FixedGoalRevWalk[S, A, G], ActsRev[S, A, G], ABC):
402    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
403        return self.random_walk(states, num_steps_l)[0]

Can sample states from goal, which is the same for all problem instances

def random_walk_rev(self, states: List[~S], num_steps_l: List[int]) -> List[~S]:
402    def random_walk_rev(self, states: List[S], num_steps_l: List[int]) -> List[S]:
403        return self.random_walk(states, num_steps_l)[0]
class NextStateNP(deepxube.base.domain.Domain[~S, ~A, ~G]):
407class NextStateNP(Domain[S, A, G]):
408    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
409        states_np: List[NDArray] = self._states_to_np(states)
410        states_next_np, tcs = self._next_state_np(states_np, actions)
411        states_next: List[S] = self._np_to_states(states_next_np)
412
413        return states_next, tcs
414
415    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
416        states_np = self._states_to_np(states)
417        path_costs: List[float] = [0.0 for _ in states]
418
419        num_steps: NDArray[np.int_] = np.array(num_steps_l)
420        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
421        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
422        while np.any(steps_lt):
423            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
424            states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
425            actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove)
426
427            states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand)
428
429            for l_idx in range(len(states_np)):
430                states_np[l_idx][idxs] = states_moved[l_idx]
431            idx: int
432            for act_idx, idx in enumerate(idxs):
433                path_costs[idx] += tcs[act_idx]
434
435            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
436
437            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
438
439        return self._np_to_states(states_np), path_costs
440
441    @abstractmethod
442    def _states_to_np(self, states: List[S]) -> List[NDArray]:
443        pass
444
445    @abstractmethod
446    def _np_to_states(self, states_np_l: List[NDArray]) -> List[S]:
447        pass
448
449    @abstractmethod
450    def _get_state_np_actions(self, states_np_l: List[NDArray]) -> List[List[A]]:
451        pass
452
453    def _get_state_np_action_rand(self, states_np: List[NDArray]) -> List[A]:
454        state_actions_l: List[List[A]] = self._get_state_np_actions(states_np)
455        return [random.choice(state_actions) for state_actions in state_actions_l]
456
457    @abstractmethod
458    def _next_state_np(self, states_np: List[NDArray], actions: List[A]) -> Tuple[List[NDArray], List[float]]:
459        """ Get the next state and transition cost given the current numpy representations of the state and action
460
461
462        @param states_np: numpy representation of states. Each row in each element of states_np list represents
463        information for a different state. There can be one or more multiple elements in the list for each state.
464        This object should not be mutated.
465        @param actions: actions
466        @return: Numpy representation of next states, transition costs
467        """
468        pass

Helper class that provides a standard way to create an ABC using inheritance.

def next_state( self, states: List[~S], actions: List[~A]) -> Tuple[List[~S], List[float]]:
408    def next_state(self, states: List[S], actions: List[A]) -> Tuple[List[S], List[float]]:
409        states_np: List[NDArray] = self._states_to_np(states)
410        states_next_np, tcs = self._next_state_np(states_np, actions)
411        states_next: List[S] = self._np_to_states(states_next_np)
412
413        return states_next, tcs

Get the next state and transition cost given the current state and action

Parameters
  • states: List of states
  • actions: List of actions to take
Returns

Next states, transition costs

def random_walk( self, states: List[~S], num_steps_l: List[int]) -> Tuple[List[~S], List[float]]:
415    def random_walk(self, states: List[S], num_steps_l: List[int]) -> Tuple[List[S], List[float]]:
416        states_np = self._states_to_np(states)
417        path_costs: List[float] = [0.0 for _ in states]
418
419        num_steps: NDArray[np.int_] = np.array(num_steps_l)
420        num_steps_curr: NDArray[np.int_] = np.zeros(len(states), dtype=int)
421        steps_lt: NDArray[np.bool_] = num_steps_curr < num_steps
422        while np.any(steps_lt):
423            idxs: NDArray[np.int_] = np.where(steps_lt)[0]
424            states_np_tomove: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
425            actions_rand: List[A] = self._get_state_np_action_rand(states_np_tomove)
426
427            states_moved, tcs = self._next_state_np(states_np_tomove, actions_rand)
428
429            for l_idx in range(len(states_np)):
430                states_np[l_idx][idxs] = states_moved[l_idx]
431            idx: int
432            for act_idx, idx in enumerate(idxs):
433                path_costs[idx] += tcs[act_idx]
434
435            num_steps_curr[idxs] = num_steps_curr[idxs] + 1
436
437            steps_lt[idxs] = num_steps_curr[idxs] < num_steps[idxs]
438
439        return self._np_to_states(states_np), path_costs

Perform a random walk on the given states for the given number of steps

Parameters
  • states: List of states
  • num_steps_l: number of steps to take for each state
Returns

The resulting state and the path cost for each random walk

471class NextStateNPActsEnum(NextStateNP[S, A, G], ActsEnum[S, A, G], ABC):
472    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
473        # initialize
474        states_np: List[NDArray] = self._states_to_np(states)
475        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
476        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
477        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
478        state_actions: List[List[A]] = self.get_state_actions(states)
479
480        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
481        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
482        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
483
484        # for each move, get next states, transition costs, and if solved
485        while np.any(actions_lt):
486            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
487            states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
488            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
489
490            # next state
491            states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs)
492            states_next: List[S] = self._np_to_states(states_next_np)
493
494            # append
495            idx: int
496            for exp_idx, idx in enumerate(idxs):
497                states_exp_l[idx].append(states_next[exp_idx])
498                actions_exp_l[idx].append(actions_idxs[exp_idx])
499                tcs_l[idx].append(tcs_move[exp_idx])
500
501            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
502            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
503
504        return states_exp_l, actions_exp_l, tcs_l

Helper class that provides a standard way to create an ABC using inheritance.

def expand( self, states: List[~S]) -> Tuple[List[List[~S]], List[List[~A]], List[List[float]]]:
472    def expand(self, states: List[S]) -> Tuple[List[List[S]], List[List[A]], List[List[float]]]:
473        # initialize
474        states_np: List[NDArray] = self._states_to_np(states)
475        states_exp_l: List[List[S]] = [[] for _ in range(len(states))]
476        actions_exp_l: List[List[A]] = [[] for _ in range(len(states))]
477        tcs_l: List[List[float]] = [[] for _ in range(len(states))]
478        state_actions: List[List[A]] = self.get_state_actions(states)
479
480        num_actions_tot: NDArray[np.int_] = np.array([len(x) for x in state_actions])
481        num_actions_taken: NDArray[np.int_] = np.zeros(len(states), dtype=int)
482        actions_lt: NDArray[np.bool_] = num_actions_taken < num_actions_tot
483
484        # for each move, get next states, transition costs, and if solved
485        while np.any(actions_lt):
486            idxs: NDArray[np.int_] = np.where(actions_lt)[0]
487            states_np_idxs: List[NDArray] = [states_np_i[idxs] for states_np_i in states_np]
488            actions_idxs: List[A] = [state_actions[idx].pop(0) for idx in idxs]
489
490            # next state
491            states_next_np, tcs_move = self._next_state_np(states_np_idxs, actions_idxs)
492            states_next: List[S] = self._np_to_states(states_next_np)
493
494            # append
495            idx: int
496            for exp_idx, idx in enumerate(idxs):
497                states_exp_l[idx].append(states_next[exp_idx])
498                actions_exp_l[idx].append(actions_idxs[exp_idx])
499                tcs_l[idx].append(tcs_move[exp_idx])
500
501            num_actions_taken[idxs] = num_actions_taken[idxs] + 1
502            actions_lt[idxs] = num_actions_taken[idxs] < num_actions_tot[idxs]
503
504        return states_exp_l, actions_exp_l, tcs_l

Generate all children for the state, assumes there is at least one child state

Parameters
  • states: List of states
Returns

Children of each state, actions, transition costs for each state

507class NextStateNPActsEnumFixed(NextStateNPActsEnum[S, A, G], ActsEnumFixed[S, A, G], ABC):
508    def _get_state_np_actions(self, states_np: List[NDArray]) -> List[List[A]]:
509        state_actions: List[A] = self.get_actions_fixed()
510        return [state_actions.copy() for _ in range(states_np[0].shape[0])]

Helper class that provides a standard way to create an ABC using inheritance.

class SupportsPDDL(deepxube.base.domain.Domain[~S, ~A, ~G], abc.ABC):
513class SupportsPDDL(Domain[S, A, G], ABC):
514    @abstractmethod
515    def get_pddl_domain(self) -> List[str]:
516        pass
517
518    @abstractmethod
519    def state_goal_to_pddl_inst(self, state: S, goal: G) -> List[str]:
520        pass
521
522    @abstractmethod
523    def pddl_action_to_action(self, pddl_action: str) -> A:
524        pass

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def get_pddl_domain(self) -> List[str]:
514    @abstractmethod
515    def get_pddl_domain(self) -> List[str]:
516        pass
@abstractmethod
def state_goal_to_pddl_inst(self, state: ~S, goal: ~G) -> List[str]:
518    @abstractmethod
519    def state_goal_to_pddl_inst(self, state: S, goal: G) -> List[str]:
520        pass
@abstractmethod
def pddl_action_to_action(self, pddl_action: str) -> ~A:
522    @abstractmethod
523    def pddl_action_to_action(self, pddl_action: str) -> A:
524        pass
class GoalGrndAtoms(deepxube.base.domain.GoalSampleable[~S, ~A, ~G]):
527class GoalGrndAtoms(GoalSampleable[S, A, G]):
528    @abstractmethod
529    def state_to_model(self, states: List[S]) -> List[Model]:
530        pass
531
532    @abstractmethod
533    def model_to_state(self, models: List[Model]) -> List[S]:
534        """ Assumes model is a fully specified state
535
536        :param models:
537        :return:
538        """
539        pass
540
541    @abstractmethod
542    def goal_to_model(self, goals: List[G]) -> List[Model]:
543        pass
544
545    @abstractmethod
546    def model_to_goal(self, models: List[Model]) -> List[G]:
547        pass
548
549    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
550        """ Returns whether or not state is solved
551
552        :param states: List of states
553        :param goals: List of goals
554        :return: Boolean numpy array where the element at index i corresponds to whether or not the
555        state at index i is solved
556        """
557        models_g: List[Model] = self.goal_to_model(goals)
558        is_solved_l: List[bool] = []
559        models_s: List[Model] = self.state_to_model(states)
560        for model_state, model_goal in zip(models_s, models_g):
561            is_solved_l.append(model_goal.issubset(model_state))
562
563        return is_solved_l
564
565    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
566        models_g: List[Model] = []
567
568        models_s: List[Model] = self.state_to_model(states_goal)
569        keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal))
570        for model_s, keep_prob in zip(models_s, keep_probs):
571            rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob)
572            models_g.append(frozenset(rand_subset))
573
574        return self.model_to_goal(models_g)
575
576    @abstractmethod
577    def get_bk(self) -> List[str]:
578        """ get background, each element in list is a line
579
580        :return:
581        """
582        pass
583
584    @abstractmethod
585    def get_ground_atoms(self) -> List[Atom]:
586        """ Get all possible ground atoms that can be used to make a state
587
588        :return:
589        """
590        pass
591
592    @abstractmethod
593    def on_model(self, m: ModelCl) -> Model:
594        """ Process results from clingo
595
596        :param m:
597        :return:
598        """
599        pass
600
601    @abstractmethod
602    def start_state_fixed(self, states: List[S]) -> List[Model]:
603        """ Given the start state, what must also be true for the goal state (i.e. immovable walls)
604
605        :param states:
606        :return:
607        """
608        pass

Can sample goals from states

@abstractmethod
def state_to_model(self, states: List[~S]) -> List[FrozenSet[Tuple[str, ...]]]:
528    @abstractmethod
529    def state_to_model(self, states: List[S]) -> List[Model]:
530        pass
@abstractmethod
def model_to_state(self, models: List[FrozenSet[Tuple[str, ...]]]) -> List[~S]:
532    @abstractmethod
533    def model_to_state(self, models: List[Model]) -> List[S]:
534        """ Assumes model is a fully specified state
535
536        :param models:
537        :return:
538        """
539        pass

Assumes model is a fully specified state

Parameters
  • models:
Returns
@abstractmethod
def goal_to_model(self, goals: List[~G]) -> List[FrozenSet[Tuple[str, ...]]]:
541    @abstractmethod
542    def goal_to_model(self, goals: List[G]) -> List[Model]:
543        pass
@abstractmethod
def model_to_goal(self, models: List[FrozenSet[Tuple[str, ...]]]) -> List[~G]:
545    @abstractmethod
546    def model_to_goal(self, models: List[Model]) -> List[G]:
547        pass
def is_solved(self, states: List[~S], goals: List[~G]) -> List[bool]:
549    def is_solved(self, states: List[S], goals: List[G]) -> List[bool]:
550        """ Returns whether or not state is solved
551
552        :param states: List of states
553        :param goals: List of goals
554        :return: Boolean numpy array where the element at index i corresponds to whether or not the
555        state at index i is solved
556        """
557        models_g: List[Model] = self.goal_to_model(goals)
558        is_solved_l: List[bool] = []
559        models_s: List[Model] = self.state_to_model(states)
560        for model_state, model_goal in zip(models_s, models_g):
561            is_solved_l.append(model_goal.issubset(model_state))
562
563        return is_solved_l

Returns whether or not state is solved

Parameters
  • states: List of states
  • goals: List of goals
Returns

Boolean numpy array where the element at index i corresponds to whether or not the state at index i is solved

def sample_goal(self, states_start: List[~S], states_goal: List[~S]) -> List[~G]:
565    def sample_goal(self, states_start: List[S], states_goal: List[S]) -> List[G]:
566        models_g: List[Model] = []
567
568        models_s: List[Model] = self.state_to_model(states_goal)
569        keep_probs: NDArray[np.float64] = np.random.rand(len(states_goal))
570        for model_s, keep_prob in zip(models_s, keep_probs):
571            rand_subset: Set[Atom] = misc_utils.random_subset(model_s, keep_prob)
572            models_g.append(frozenset(rand_subset))
573
574        return self.model_to_goal(models_g)

Given a state, return a goal that represents a set of goal states of which the given state is a member. Does not have to always return the same goal.

Parameters
  • states_start: List of start states :param states_goal List of states from which goals will be sampled
Returns

Goals

@abstractmethod
def get_bk(self) -> List[str]:
576    @abstractmethod
577    def get_bk(self) -> List[str]:
578        """ get background, each element in list is a line
579
580        :return:
581        """
582        pass

get background, each element in list is a line

Returns
@abstractmethod
def get_ground_atoms(self) -> List[Tuple[str, ...]]:
584    @abstractmethod
585    def get_ground_atoms(self) -> List[Atom]:
586        """ Get all possible ground atoms that can be used to make a state
587
588        :return:
589        """
590        pass

Get all possible ground atoms that can be used to make a state

Returns
@abstractmethod
def on_model(self, m: clingo.solving.Model) -> FrozenSet[Tuple[str, ...]]:
592    @abstractmethod
593    def on_model(self, m: ModelCl) -> Model:
594        """ Process results from clingo
595
596        :param m:
597        :return:
598        """
599        pass

Process results from clingo

Parameters
  • m:
Returns
@abstractmethod
def start_state_fixed(self, states: List[~S]) -> List[FrozenSet[Tuple[str, ...]]]:
601    @abstractmethod
602    def start_state_fixed(self, states: List[S]) -> List[Model]:
603        """ Given the start state, what must also be true for the goal state (i.e. immovable walls)
604
605        :param states:
606        :return:
607        """
608        pass

Given the start state, what must also be true for the goal state (i.e. immovable walls)

Parameters
  • states:
Returns