Creating a Custom Domain and Heuristic Function

We will create a simple grid domain where the agent can move up, down, left, or right along a two-dimensional grid to reach a goal square. We will create neural network inputs for a neural network that DeepXube provides as well as a neural network input for our own custom neural network.

In the directory in which you run deepxube, create a domains/grid_tutorial.py file. DeepXube automatically looks in the domains/ folder to see what is registered.

Implementation

The entire domain file is here. This includes the states, actions, goals, domain, neural network inputs, custom neural network, and parsers. This file will be explained part-by-part.

from typing import List, Tuple, Optional, Type
import numpy as np
from torch import nn, Tensor

from deepxube.base.factory import DelimParser
from deepxube.base.domain import State, Action, Goal, ActsEnumFixed, StartGoalWalkable, StateGoalVizable, StringToAct
from deepxube.base.nnet_input import StateGoalIn, StateGoalActFixIn, StateGoalActIn, FlatIn
from deepxube.base.heuristic import HeurNNet

from deepxube.factories.heuristic_factory import heuristic_factory
from deepxube.factories.domain_factory import domain_factory
from deepxube.factories.nnet_input_factory import register_nnet_input

from deepxube.nnet.pytorch_models import Conv2dModel, FullyConnectedModel

from numpy.typing import NDArray
import random

from matplotlib.figure import Figure
from matplotlib.colors import ListedColormap
from matplotlib.axes import Axes


# start sag
class GridState(State):
    def __init__(self, robot_x: int, robot_y: int):
        self.robot_x: int = robot_x
        self.robot_y: int = robot_y

    def __hash__(self) -> int:
        return hash(self.robot_x + self.robot_y)

    def __eq__(self, other: object) -> bool:
        if isinstance(other, GridState):
            return (self.robot_x == other.robot_x) and (self.robot_y == other.robot_y)
        return NotImplemented


class GridGoal(Goal):
    def __init__(self, robot_x: int, robot_y: int):
        self.robot_x: int = robot_x
        self.robot_y: int = robot_y


class GridAction(Action):
    def __init__(self, action: int):
        self.action = action

    def __hash__(self) -> int:
        return self.action

    def __eq__(self, other: object) -> bool:
        if isinstance(other, GridAction):
            return self.action == other.action
        return NotImplemented

    def __repr__(self) -> str:
        return ["UP", "DOWN", "LEFT", "RIGHT"][self.action]
# end sag


# start def
@domain_factory.register_class("grid_tut")
class Grid(ActsEnumFixed[GridState, GridAction, GridGoal], StartGoalWalkable[GridState, GridAction, GridGoal],
           StateGoalVizable[GridState, GridAction, GridGoal], StringToAct[GridState, GridAction, GridGoal]):
    def __init__(self, dim: int = 7):
        super().__init__()
        self.dim: int = dim
        self.actions_fixed: List[GridAction] = [GridAction(x) for x in [0, 1, 2, 3]]
    # end init

    # start domain methods
    def is_solved(self, states: List[GridState], goals: List[GridGoal]) -> List[bool]:
        return [(state.robot_x == goal.robot_x) and (state.robot_y == goal.robot_y) for state, goal in zip(states, goals)]

    def next_state(self, states: List[GridState], actions: List[GridAction]) -> Tuple[List[GridState], List[float]]:
        states_next: List[GridState] = []
        for state, action in zip(states, actions):
            if action.action == 1:  # up
                states_next.append(GridState(min(state.robot_x + 1, self.dim - 1), state.robot_y))
            elif action.action == 0:  # down
                states_next.append(GridState(max(state.robot_x - 1, 0), state.robot_y))
            elif action.action == 3:  # left
                states_next.append(GridState(state.robot_x, min(state.robot_y + 1, self.dim - 1)))
            elif action.action == 2:  # right
                states_next.append(GridState(state.robot_x, max(state.robot_y - 1, 0)))

        return states_next, [1.0] * len(states_next)
    # end domain methods

    # start actsenumfixed methods
    def get_actions_fixed(self) -> List[GridAction]:
        return self.actions_fixed.copy()
    # end actsenumfixed methods

    # start startgoalwalkable methods
    def sample_start_states(self, num_states: int) -> List[GridState]:
        return [GridState(random.randint(0, self.dim - 1), random.randint(0, self.dim - 1)) for _ in range(num_states)]

    def sample_goal_from_state(self, states_start: Optional[List[GridState]], states_goal: List[GridState]) -> List[GridGoal]:
        return [GridGoal(state_goal.robot_x, state_goal.robot_y) for state_goal in states_goal]
    # end startgoalwalkable methods

    # start viz methods
    def visualize_state_goal(self, state: GridState, goal: GridGoal, fig: Figure) -> None:
        ax: Axes = fig.subplots(1, 1)
        grid: NDArray = np.zeros((self.dim, self.dim))
        grid[goal.robot_x, goal.robot_y] = 2
        grid[state.robot_x, state.robot_y] = 1
        ax.imshow(grid, cmap=ListedColormap(["white", "black", "green"]), origin="upper")

    def string_to_action(self, act_str: str) -> Optional[GridAction]:
        if act_str in {"w", "s", "a", "d"}:
            return GridAction(["w", "s", "a", "d"].index(act_str))
        else:
            return None

    def string_to_action_help(self) -> str:
        return "w, s, a, or d for up, down, left, and right, respectively."
    # end viz methods

    # start repr methods
    def __repr__(self) -> str:
        return f"Grid(dim={self.dim})"
    # end repr methods


# start domain parser
@domain_factory.register_parser("grid_tut")
class GridParser(DelimParser):
    def __init__(self) -> None:
        super().__init__()
        self.add_argument("d", "dim", int, "dimensionality of grid")

    @property
    def delim(self) -> str:
        return "_"
# end domain parser


# start gridflatin definition
@register_nnet_input("grid_tut", "grid_flat_in")
class GridFlatIn(StateGoalIn[Grid, GridState, GridGoal], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4], [self.domain.dim]

    def to_np(self, states: List[GridState], goals: List[GridGoal]) -> List[NDArray]:
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)]
# end gridflatin definition


# start gridflatinqfix definition
@register_nnet_input("grid_tut", "grid_flat_in_qfix")
class GridFlatInQFix(StateGoalActFixIn[Grid, GridState, GridGoal, GridAction], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4], [self.domain.dim]

    def to_np(self, states: List[GridState], goals: List[GridGoal], actions_l: List[List[GridAction]]) -> List[NDArray]:
        actions_np: NDArray = np.array([[action_i.action for action_i in actions] for actions in actions_l])
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)] + [actions_np]
# end gridflatinqfix definition


# start gridflatinactin definition
@register_nnet_input("grid_tut", "grid_flat_in_actin")
class GridFlatInActIn(StateGoalActIn[Grid, GridState, GridGoal, GridAction], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4, 1], [self.domain.dim, self.domain.get_num_acts()]

    def to_np(self, states: List[GridState], goals: List[GridGoal], actions: List[GridAction]) -> List[NDArray]:
        actions_np: NDArray = np.expand_dims(np.array([action_i.action for action_i in actions]), 1)
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)] + [actions_np]
# end gridflatinactin definition


# start grid nnet definition
@register_nnet_input("grid_tut", "grid_nnet_input")
class GridNNetInput(StateGoalIn[Grid, GridState, GridGoal]):
    def get_input_info(self) -> int:
        return self.domain.dim

    def to_np(self, states: List[GridState], goals: List[GridGoal]) -> List[NDArray]:
        np_rep: NDArray = np.zeros((len(states), 2, self.domain.dim, self.domain.dim))
        for idx, (state, goal) in enumerate(zip(states, goals)):
            np_rep[idx, 0, state.robot_x, state.robot_y] = 1
            np_rep[idx, 1, goal.robot_x, goal.robot_y] = 1

        return [np_rep]


@heuristic_factory.register_class("gridnet_tut")
class GridNet(HeurNNet[GridNNetInput]):
    @staticmethod
    def nnet_input_type() -> Type[GridNNetInput]:
        return GridNNetInput

    def __init__(self, nnet_input: GridNNetInput, out_dim: int, q_fix: bool, chan_size: int = 8, fc_size: int = 100):
        super().__init__(nnet_input, out_dim, q_fix)
        # one hots
        self.one_hots: nn.ModuleList = nn.ModuleList()
        grid_dim: int = self.nnet_input.get_input_info()

        self.heur: nn.Module = nn.Sequential(
            Conv2dModel(2, [chan_size, chan_size], [3, 3], [1, 1], ["RELU", "RELU"], batch_norms=[True, True]),
            nn.Flatten(),
            FullyConnectedModel(grid_dim * grid_dim * chan_size, [fc_size], ["RELU"], batch_norms=[True]),
            nn.Linear(fc_size, self.out_dim)
        )

    def _forward(self, inputs: List[Tensor]) -> Tensor:
        x: Tensor = self.heur(inputs[0])
        return x


@heuristic_factory.register_parser("gridnet_tut")
class GridNetParser(DelimParser):
    def __init__(self) -> None:
        super().__init__()
        self.add_argument("ch", "chan_size", int, "number of channels")
        self.add_argument("fc", "fc_size", int, "size of fully connected layer")

    @property
    def delim(self) -> str:
        return "_"

Tip

Since the domain is registered, we should be able to see “grid_tut” with: deepxube domain_info after it is put in your domains/ folder.

State, Action, Goal

To faciliate using states with Python dictionary objects and re-identifying states during search, all State objects must implement __hash__ and __eq__. This must also be done for Action objects.

class GridState(State):
    def __init__(self, robot_x: int, robot_y: int):
        self.robot_x: int = robot_x
        self.robot_y: int = robot_y

    def __hash__(self) -> int:
        return hash(self.robot_x + self.robot_y)

    def __eq__(self, other: object) -> bool:
        if isinstance(other, GridState):
            return (self.robot_x == other.robot_x) and (self.robot_y == other.robot_y)
        return NotImplemented


class GridGoal(Goal):
    def __init__(self, robot_x: int, robot_y: int):
        self.robot_x: int = robot_x
        self.robot_y: int = robot_y


class GridAction(Action):
    def __init__(self, action: int):
        self.action = action

    def __hash__(self) -> int:
        return self.action

    def __eq__(self, other: object) -> bool:
        if isinstance(other, GridAction):
            return self.action == other.action
        return NotImplemented

    def __repr__(self) -> str:
        return ["UP", "DOWN", "LEFT", "RIGHT"][self.action]
# end sag


# start def
@domain_factory.register_class("grid_tut")
class Grid(ActsEnumFixed[GridState, GridAction, GridGoal], StartGoalWalkable[GridState, GridAction, GridGoal],
           StateGoalVizable[GridState, GridAction, GridGoal], StringToAct[GridState, GridAction, GridGoal]):
    def __init__(self, dim: int = 7):
        super().__init__()
        self.dim: int = dim
        self.actions_fixed: List[GridAction] = [GridAction(x) for x in [0, 1, 2, 3]]

Tip

Implementing __repr__ for Action objects can be convenient since actions are printed to the screen when interacting with problem instances with deepxube viz.

Domain

Registration, Mixins, and Initialization

We will register the domain with the name grid_tut. This tells DeepXube that this name refers to the domain being defined.

We will use the deepxube.base.domain.ActsEnumFixed mixin since the action space is fixed (up, down, left, right) and enumerable. We will use the deepxube.base.domain.StartGoalWalkable to generate problem instances by sampling a start state, taking a random walk, and using the terminal state to sample a goal.We will also use the deepxube.base.domain.StateGoalVizable and deepxube.base.domain.StringToAct to interact with the domain using deepxube viz. The domain will be given an argument for its dimensionality.

@domain_factory.register_class("grid_tut")
class Grid(ActsEnumFixed[GridState, GridAction, GridGoal], StartGoalWalkable[GridState, GridAction, GridGoal],
           StateGoalVizable[GridState, GridAction, GridGoal], StringToAct[GridState, GridAction, GridGoal]):
    def __init__(self, dim: int = 7):
        super().__init__()
        self.dim: int = dim
        self.actions_fixed: List[GridAction] = [GridAction(x) for x in [0, 1, 2, 3]]

Important

A default value should be set for all Domain arguments in case they are not set via the command line.

Domain methods

The abstract methods from deepxube.base.domain.Domain not implemented by mixins are deepxube.base.domain.Domain.is_solved() and deepxube.base.domain.Domain.next_state(). is_solved checks if the x and y location of the agent is at the goal x and y location and next_state moves the agent in the corresponding direction with a transition cost of 1 for all actions.

    def is_solved(self, states: List[GridState], goals: List[GridGoal]) -> List[bool]:
        return [(state.robot_x == goal.robot_x) and (state.robot_y == goal.robot_y) for state, goal in zip(states, goals)]

    def next_state(self, states: List[GridState], actions: List[GridAction]) -> Tuple[List[GridState], List[float]]:
        states_next: List[GridState] = []
        for state, action in zip(states, actions):
            if action.action == 1:  # up
                states_next.append(GridState(min(state.robot_x + 1, self.dim - 1), state.robot_y))
            elif action.action == 0:  # down
                states_next.append(GridState(max(state.robot_x - 1, 0), state.robot_y))
            elif action.action == 3:  # left
                states_next.append(GridState(state.robot_x, min(state.robot_y + 1, self.dim - 1)))
            elif action.action == 2:  # right
                states_next.append(GridState(state.robot_x, max(state.robot_y - 1, 0)))

        return states_next, [1.0] * len(states_next)

ActsEnumFixed methods

deepxube.base.domain.ActsEnumFixed automatically implements deepxube.base.domain.Domain.sample_state_action() based on the abstract method deepxube.base.domain.ActsEnumFixed.get_actions_fixed(). This is implemented by simply returning a copy of the list created in the __init__ method containing all actions.

    def get_actions_fixed(self) -> List[GridAction]:
        return self.actions_fixed.copy()

StartGoalWalkable methods

deepxube.base.domain.StartGoalWalkable automatically implements deepxube.base.domain.Domain.sample_problem_instances() based on the abstract methods deepxube.base.domain.StartGoalWalkable.sample_start_states() and deepxube.base.domain.GoalSampleableFromState.sample_goal_from_state(). sample_start_states is implemented by placing the agent at a random x, y location. sample_goal_from_state is implemented by using the x, y of the agent’s location as the desired goal.

    def sample_start_states(self, num_states: int) -> List[GridState]:
        return [GridState(random.randint(0, self.dim - 1), random.randint(0, self.dim - 1)) for _ in range(num_states)]

    def sample_goal_from_state(self, states_start: Optional[List[GridState]], states_goal: List[GridState]) -> List[GridGoal]:
        return [GridGoal(state_goal.robot_x, state_goal.robot_y) for state_goal in states_goal]

Visualization and Interaction Methods

deepxube.base.domain.StateGoalVizable and deepxube.base.domain.StringToAct allow for the visualization of problem instances and interaction with them using the terminal. A simple grid is created with black and green to indicate the locations of the agent and goal, respectively.

    def visualize_state_goal(self, state: GridState, goal: GridGoal, fig: Figure) -> None:
        ax: Axes = fig.subplots(1, 1)
        grid: NDArray = np.zeros((self.dim, self.dim))
        grid[goal.robot_x, goal.robot_y] = 2
        grid[state.robot_x, state.robot_y] = 1
        ax.imshow(grid, cmap=ListedColormap(["white", "black", "green"]), origin="upper")

    def string_to_action(self, act_str: str) -> Optional[GridAction]:
        if act_str in {"w", "s", "a", "d"}:
            return GridAction(["w", "s", "a", "d"].index(act_str))
        else:
            return None

    def string_to_action_help(self) -> str:
        return "w, s, a, or d for up, down, left, and right, respectively."

Representation Method

    def __repr__(self) -> str:
        return f"Grid(dim={self.dim})"

Tip

Implementing __repr__ for Domain objects can be convenient since the domain is printed to the screen and output.txt file during training and solving. Having an identifiable name for the domain along with a clear representation of its parameters can be helpful when looking back on different runs.

Domain Parser

To allow the user to set parameters of the domain via the command line, one can implement a deepxube.base.factory.Parser class and register it with the same name as the domain. The deepxube.base.factory.DelimParser is a subclass that makes it easy to define parsing and help messages.

@domain_factory.register_parser("grid_tut")
class GridParser(DelimParser):
    def __init__(self) -> None:
        super().__init__()
        self.add_argument("d", "dim", int, "dimensionality of grid")

    @property
    def delim(self) -> str:
        return "_"

Now, grid domains of different dimensions can be created using the command-line:

deepxube viz --domain grid_tut.7d --steps 100

deepxube viz --domain grid_tut.20d --steps 100

Note

Everything after the “.” in the domain name is given to the parser to be parsed.

Neural Network Inputs

Flat Input

@register_nnet_input("grid_tut", "grid_flat_in")
class GridFlatIn(StateGoalIn[Grid, GridState, GridGoal], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4], [self.domain.dim]

    def to_np(self, states: List[GridState], goals: List[GridGoal]) -> List[NDArray]:
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)]

Flat Input for a Q-Network with a Fixed Action Output

@register_nnet_input("grid_tut", "grid_flat_in_qfix")
class GridFlatInQFix(StateGoalActFixIn[Grid, GridState, GridGoal, GridAction], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4], [self.domain.dim]

    def to_np(self, states: List[GridState], goals: List[GridGoal], actions_l: List[List[GridAction]]) -> List[NDArray]:
        actions_np: NDArray = np.array([[action_i.action for action_i in actions] for actions in actions_l])
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)] + [actions_np]

Flat Input for a Q-Network with the Action as an Input

@register_nnet_input("grid_tut", "grid_flat_in_actin")
class GridFlatInActIn(StateGoalActIn[Grid, GridState, GridGoal, GridAction], FlatIn[Grid]):
    def get_input_info(self) -> Tuple[List[int], List[int]]:
        return [4, 1], [self.domain.dim, self.domain.get_num_acts()]

    def to_np(self, states: List[GridState], goals: List[GridGoal], actions: List[GridAction]) -> List[NDArray]:
        actions_np: NDArray = np.expand_dims(np.array([action_i.action for action_i in actions]), 1)
        return [np.stack([np.stack([state.robot_x for state in states]), np.stack([state.robot_y for state in states]),
                          np.stack([goal.robot_x for goal in goals]), np.stack([goal.robot_y for goal in goals])], axis=1)] + [actions_np]

Custom Neural Network

@register_nnet_input("grid_tut", "grid_nnet_input")
class GridNNetInput(StateGoalIn[Grid, GridState, GridGoal]):
    def get_input_info(self) -> int:
        return self.domain.dim

    def to_np(self, states: List[GridState], goals: List[GridGoal]) -> List[NDArray]:
        np_rep: NDArray = np.zeros((len(states), 2, self.domain.dim, self.domain.dim))
        for idx, (state, goal) in enumerate(zip(states, goals)):
            np_rep[idx, 0, state.robot_x, state.robot_y] = 1
            np_rep[idx, 1, goal.robot_x, goal.robot_y] = 1

        return [np_rep]


@heuristic_factory.register_class("gridnet_tut")
class GridNet(HeurNNet[GridNNetInput]):
    @staticmethod
    def nnet_input_type() -> Type[GridNNetInput]:
        return GridNNetInput

    def __init__(self, nnet_input: GridNNetInput, out_dim: int, q_fix: bool, chan_size: int = 8, fc_size: int = 100):
        super().__init__(nnet_input, out_dim, q_fix)
        # one hots
        self.one_hots: nn.ModuleList = nn.ModuleList()
        grid_dim: int = self.nnet_input.get_input_info()

        self.heur: nn.Module = nn.Sequential(
            Conv2dModel(2, [chan_size, chan_size], [3, 3], [1, 1], ["RELU", "RELU"], batch_norms=[True, True]),
            nn.Flatten(),
            FullyConnectedModel(grid_dim * grid_dim * chan_size, [fc_size], ["RELU"], batch_norms=[True]),
            nn.Linear(fc_size, self.out_dim)
        )

    def _forward(self, inputs: List[Tensor]) -> Tensor:
        x: Tensor = self.heur(inputs[0])
        return x


@heuristic_factory.register_parser("gridnet_tut")
class GridNetParser(DelimParser):
    def __init__(self) -> None:
        super().__init__()
        self.add_argument("ch", "chan_size", int, "number of channels")
        self.add_argument("fc", "fc_size", int, "size of fully connected layer")

    @property
    def delim(self) -> str:
        return "_"

Timing and Debugging