Now Reading
Hands-On Python Guide To AllenAct – An Embodied AI Project

Hands-On Python Guide To AllenAct – An Embodied AI Project

AllenAct

Introduction

AllenAct is an open-source project and a modular learning framework designed for researchers and technophiles associated with the domain of Embodied AI. It provides state-of-the-art reproductions of numerous embodied AI models. It also supports the expanding collection of embodied AI tasks, algorithms used to accomplish those tasks as well as environments to run them efficiently. 

Are you unfamiliar with the term ‘Embodied AI’? Refer to the ‘Overview of Embodied AI’ section of this article before proceeding!

REGISTER FOR OUR UPCOMING ML WORKSHOP

AllenAct was introduced by members of the PRIOR (Perceptual Reasoning and Interaction Research) group at the Allen Institute for Artificial Intelligence (AI2) in August, 2020. AI2 is a non-profit research institution headquartered at Seattle, Washington (United States). It was founded by a co-founder of Microsoft, Paul G. Allen in 2014 with an aim of conducting influential research in the field of AI. (Check out Paul Allen’s personal website here). AI2 is now being led by a leading AI researcher, Dr. Oren Etzioni.

Captivating features of AllenAct

Click here for instructions to install any of these supported environments.

  • AllenAct has dissociated tasks from environments so that one can run multiple tasks in the same environment.
  • It supports a variety of algorithms; to name a few – PPO, DD-PPO, A2C and DAgger.
  • It supports algorithms and tasks involving multiple embodied agents.
  • It enables easy visualization of first and third person views for agents and intermediate model tensors, integrated into Tensorboard.
  • It is among a handful of Reinforcement Learning frameworks which support PyTorch.
  • It allows the usage of a sequence of algorithms for efficiently training the models.
  • It enables easy combining of various loss functions for model training.

Installing AllenAct

Requirements: Working version of Python 3.6 

Clone the GitHub repository to your local machine 

git clone [email protected]:allenai/allenact.git

Move to the top directory 

cd allenact

Install requirements using pipenv

pipenv install --skip-lock --dev

Install requirements using pip or pip3 (as per your machine’s configuration)

pip install -r requirements.txt

Visit this page for detailed installation instructions.

Also read:

How Open-Sourcing AllenAct Provides A Substantial Growth For Embodied AI

Practical implementation

Here’s a demonstration of training an embodied agent on the Point Navigation task within the RoboTHOR Embodied-AI environment.

Before moving on to the implementation, let us have a look at some of the underlying concepts you may be unaware of.

What is a Point Navigation task?

The basic thing an embodied agent must know for accomplishing any task is – how to “move around” in the work environment. This task of moving around is formulated as follows:

Make your agent locate a beacon somewhere in the environment. This beacon transmits its location in such a way that at any point of time, the agent can get the direction in which it needs to proceed and the euclidean distance which it needs to cover in order to reach the beacon. This particular task is termed as Point Navigation (also known as PointNav).

PointNav, though appears simple, is a challenging task for the trained agents to perform. The reason being, such agents are not trained in a free open-space area. Rather, the training is carried out in an environment similar to real-world situations where the agents need to navigate through various hurdles such as walls, doors etc. If they are not familiar with the floor-plan of the environment they have to move in, they need to learn how to predict the design of such structures for proper navigation. 

What is an environment?

An environment is something where the trained agent exists and performs actions such as moving forward, turning right and so on. 

Simulator used

Here to build a Point Navigator, we have used RoboTHOR simulator. RoboTHOR has been designed to train models that can easily be transferred to a real robot, by providing a photo-realistic virtual environment and a real-world replica of the environment. It comprises 60 various virtual scenes with different floor plans – furniture and 15 validation scenes. 

AllenAct’s ‘Environment’

AllenAct has a class abstraction by the name Environment. It is a shallow wrapper which provides a uniform interface to the actual environment the agent works in.

Algorithm used

We have used a reinforcement algorithm called DD-PPO, a decentralized and distributed variant of the PPO algorithm. We let our agent explore the environment on its own, reward it for taking actions to approach the goal and penalize it for actions that deviate it from its goal. We then do model optimization to maximize this reward.

Install RoboTHOR environment

We need to set up the environment in which our embodied agent will perform point navigation. Click here and get to know about the environment installation steps.

Understand the dataset used

We have used the RoboTHOR PointNav dataset. It can be downloaded from here.

 It consists of several episodes with randomly generated start and end points for each scene. It also has cached distances between every pair of points in every scene. This helps the agent move in terms of geodesic distance i.e. the actual path distance and not the straight line distance.

Code

Setup experiment config file :

A Python library needs to be imported for using it. On the contrary,  AllenAct is structured as a framework with a runner script called main.py. This script runs the experiment specified in a config file. This enables storing minute records of exactly which settings were used to arrive at a particular result. As RL models are generally too costly to train, this facility reduces the expenses to some extent.

Create a new directory under projects/.

Import the required libraries and classes.

 import glob
 import os
 from math import ceil
 from typing import Dict, Any, List, Optional, Sequence
 import gym
 import numpy as np
 import torch
 import torch.nn as nn
 import torch.optim as optim
 from torch.optim.lr_scheduler import LambdaLR
 from torchvision import models
 from constants import ABS_PATH_OF_TOP_LEVEL_DIR
 from core.algorithms.onpolicy_sync.losses import PPO
 from core.algorithms.onpolicy_sync.losses.ppo import PPOConfig
 from core.base_abstractions.experiment_config import ExperimentConfig, MachineParams
 from core.base_abstractions.preprocessor import (
     ResNetPreprocessor,
     SensorPreprocessorGraph,
 )
 from core.base_abstractions.sensor import SensorSuite
 from core.base_abstractions.task import TaskSampler
 from plugins.ithor_plugin.ithor_sensors import RGBSensorThor
 from plugins.robothor_plugin.robothor_sensors import GPSCompassSensorRoboThor
 from plugins.robothor_plugin.robothor_task_samplers import PointNavDatasetTaskSampler
 from plugins.robothor_plugin.robothor_tasks import PointNavTask
 from projects.pointnav_baselines.models.point_nav_models import (
     ResnetTensorPointNavActorCritic,
 )
 from utils.experiment_utils import (
     Builder,
     PipelineStage,
     TrainingPipeline,
     LinearDecay,
     evenly_distribute_count_into_bins,
 ) 

Define an experiment config class

 class PointNavRoboThorRGBPPOExperimentConfig(ExperimentConfig):
     """A Point Navigation experiment in RoboTHOR environment.""" 

Define the task parameters

     /*number of steps that the agent can take at the most before being         
     reset (so that it does not keep on moving forever)*/
     MAX_STEPS = 500
    //configuration for the reward function
     REWARD_CONFIG = {
    /*reward given to the agent for moving each step and encouraging it to  
     move further*/
         "step_penalty": -0.01,
 // reward given for successfully reaching the goal
         "goal_success_reward": 10.0,
 /*reward given in case the agent selects ‘stop’ action before reaching 
   the goal*/
         "failed_stop_reward": 0.0,
 /*strength of signal the agent gets on approaching or moving away from 
   the goal*/
         "shaping_weight": 1.0,
     } 

Set the simulator Parameters

     CAMERA_WIDTH = 640
     CAMERA_HEIGHT = 480
     SCREEN_SIZE = 224  

Set the hardware parameters for training engine

    ADVANCE_SCENE_ROLLOUT_PERIOD: Optional[int] = None
     NUM_PROCESSES = 20
     TRAINING_GPUS: Sequence[int] = [0]
     VALIDATION_GPUS: Sequence[int] = [0]
     TESTING_GPUS: Sequence[int] = [0] 

Where,

NUM_PROCESSES is the total number of parallel processes used to train the model.

TRAINING_GPUS takes the IDs of the GPUS on which the model should be trained.

Likewise, VALIDATION_GPUS and TESTING_GPUS take the ids of the GPUS on which the validation and testing will occur respectively.

Define the paths to store the downloaded dataset 

    TRAIN_DATASET_DIR = os.path.join(
         ABS_PATH_OF_TOP_LEVEL_DIR, "datasets/robothor-pointnav/debug"
     )
     VAL_DATASET_DIR = os.path.join(
         ABS_PATH_OF_TOP_LEVEL_DIR, "datasets/robothor-pointnav/debug"
     ) 

Define the sensors

  SENSORS = [
 /* RGBSensorThor is the environment's implementation of an RGB sensor. It takes the simulator’s output (a raw image) as an input and resizes it to the specified input dimensions*/
         RGBSensorThor(
             height=SCREEN_SIZE,
             width=SCREEN_SIZE,
             use_resnet_normalization=True,
             uuid="rgb_lowres",
         ),
 /* GPSCompassSensorRoboThor is a sensor which tells us the agent about the direction and distance to its goal at every time step*/
         GPSCompassSensorRoboThor(),
     ] 

Define the preprocessor to be used with the model

See Also
AI Habitat

    PREPROCESSORS = [
         Builder(
             ResNetPreprocessor,
             {
                 "input_height": SCREEN_SIZE,
                 "input_width": SCREEN_SIZE,
                 "output_width": 7,
                 "output_height": 7,
                 "output_dims": 512,
                 "pool": False,
                 "torchvision_resnet_model": models.resnet18,
                 "input_uuids": ["rgb_lowres"],
                 "output_uuid": "rgb_resnet",
             },
         ),
     ] 

The preprocessor abstraction in AllenAct is designed with models which transform the raw pixels observed by the agent in the environment, into a complex embedding. Instead of the original image, the embedded version is then used as input to the trainable model.

Define the observation inputs to be used by the model

 OBSERVATIONS = [
 //IDs of the sensors to be used
         "rgb_resnet",
         "target_coordinates_ind",
     ] 

Define settings of the simulator

    ENV_ARGS = dict(
         width=CAMERA_WIDTH,
         height=CAMERA_HEIGHT,
         gridSize=0.25,
 //Everytime the agent takes ‘turn’ action, it rotates by 30 deg
         rotateStepDegrees=30.0,
 //Every time the agent moves forward, it goes ahead by 0.25 meters
         visibilityDistance=1.0,
 ) 

Define a method that returns name of the experiment

    @classmethod
     def tag(cls):
         return "PointNavRobothorRGBPPO" 

Define the model training pipeline

    @classmethod
     def training_pipeline(cls, **kwargs):
         ppo_steps = int(250000000) //number of training steps
         lr = 3e-4
         num_mini_batch = 1
         update_repeats = 3
         num_steps = 30 //rollout length
        //how often we save the model weights and run validation on them
         save_interval = 5000000 
         log_interval = 1000
         gamma = 0.99
         use_gae = True
         gae_lambda = 0.95
         max_grad_norm = 0.5
         return TrainingPipeline(
             save_interval=save_interval,
  //frequency at which data is accumulated from all the processes and logged
            metric_accumulate_interval=log_interval,
             optimizer_builder=Builder(optim.Adam, dict(lr=lr)),
             num_mini_batch=num_mini_batch,
             update_repeats=update_repeats,
             max_grad_norm=max_grad_norm,
             num_steps=num_steps,
             named_losses={"ppo_loss": PPO(**PPOConfig)},
             gamma=gamma,
             use_gae=use_gae,
             gae_lambda=gae_lambda,
             advance_scene_rollout_period=cls.ADVANCE_SCENE_ROLLOUT_PERIOD,
             pipeline_stages=[
                 PipelineStage(loss_names=["ppo_loss"],
                 max_stage_steps=ppo_steps)
             ],
             lr_scheduler_builder=Builder(
                 LambdaLR, {"lr_lambda": LinearDecay(steps=ppo_steps)}
             ),
         ) 

Define a method to return the hardware parameters of each process, based on the list of devices defined above

    def machine_params(self, mode="train", **kwargs):
         sampler_devices: List[int] = []
         if mode == "train":
             workers_per_device = 1
             gpu_ids = (
                 []
                 if not torch.cuda.is_available()
                 else list(self.TRAINING_GPUS) * workers_per_device
             )
             nprocesses = (
                 8
                 if not torch.cuda.is_available()
                 else evenly_distribute_count_into_bins(self.NUM_PROCESSES, 
                 len(gpu_ids))
             )
             sampler_devices = list(self.TRAINING_GPUS)
         elif mode == "valid":
             nprocesses = 1
             gpu_ids = [] if not torch.cuda.is_available() else       
             self.VALIDATION_GPUS
         elif mode == "test":
             nprocesses = 1
             gpu_ids = [] if not torch.cuda.is_available() else 
             self.TESTING_GPUS
         else:
             raise NotImplementedError("mode must be 'train', 'valid', or  
             'test'.")
         sensor_preprocessor_graph = (
             SensorPreprocessorGraph(
         
     source_observation_spaces=SensorSuite(self.SENSORS).observation_spaces,
                 preprocessors=self.PREPROCESSORS,
             )
             if mode == "train"
             or (
                 (isinstance(nprocesses, int) and nprocesses > 0)
                 or(isinstance(nprocesses, Sequence) and sum(nprocesses)> 0)
             )
             else None
         )
         return MachineParams(
             nprocesses=nprocesses,
             devices=gpu_ids,
             sampler_devices=sampler_devices
             if mode == "train"
             else gpu_ids,  # ignored with > 1 gpu_ids
             sensor_preprocessor_graph=sensor_preprocessor_graph,
         ) 

Define the actual model

We take a model from the pointnav_baselines project. It is a small convolutional neural network. It takes the output of a ResNet as its rgb input followed by a single-layered GRU. The number of different actions the agent can perform in the environment is fed to the model through the action_space parameter, which we get from the task definition.

Define the task sampler

    @classmethod
     def make_sampler_fn(cls, **kwargs) -> TaskSampler:
         return PointNavDatasetTaskSampler(**kwargs) 

This generates instances of tasks for our agent to perform. It reads the specified file and whenever the agent exceeds the maximum number of steps or selects the stop action, it sets the agent to the next starting locations.

Define a few helper functions to distribute the work if you have several GPUS and many scenes to be processed

    @staticmethod
     def _partition_inds(n: int, num_parts: int):
         return np.round(np.linspace(0, n, num_parts + 1, 
         endpoint=True)).astype(
             np.int32
         )
     def _get_sampler_args_for_scene_split(
         self,
         scenes_dir: str,
         process_ind: int,
         total_processes: int,
         seeds: Optional[List[int]] = None,
         deterministic_cudnn: bool = False,
     ) -> Dict[str, Any]:
         path = os.path.join(scenes_dir, "*.json.gz")
         scenes = [scene.split("/")[-1].split(".")[0] for scene in  
         .glob(path)]
         if len(scenes) == 0:
             raise RuntimeError(
                 (
                  "Could find no scene dataset information in directory {}."
                     " Are you sure you've downloaded them? "
                     " If not, see 
                      https://allenact.org/installation/download-datasets/    
                      information"
                     " on how this can be done."
                 ).format(scenes_dir)
             )
         if total_processes > len(scenes):  # oversample some scenes -> bias
             if total_processes % len(scenes) != 0:
                 print(
                    "Warning: oversampling some of the scenes to feed all 
                    processes."
                     " You can avoid this by setting a number of workers            
                     divisible by the number of scenes"
                 )
             scenes = scenes * int(ceil(total_processes / len(scenes)))
             scenes = scenes[: total_processes * (len(scenes) //   
             total_processes)]
         else:
             if len(scenes) % total_processes != 0:
                 print(
                     "Warning: oversampling some of the scenes to feed all  
                      processes."
                     " You can avoid this by setting a number of workers            
                     divisor of the number of scenes"
                 )
         inds = self._partition_inds(len(scenes), total_processes)
         return {
             "scenes": scenes[inds[process_ind] : inds[process_ind + 1]],
             "max_steps": self.MAX_STEPS,
             "sensors": self.SENSORS,
             "action_space": gym.spaces.Discrete(len(PointNavTask.class_action_names())),
             "seed": seeds[process_ind] if seeds is not None else None,
             "deterministic_cudnn": deterministic_cudnn,
             "rewards_config": self.REWARD_CONFIG,
         } 

Define the task sampler arguments

The arguments include the location of the dataset as well as a reference to the distance cache and the environmental arguments for the simulator mentioned above.

    def train_task_sampler_args(
         self,
         process_ind: int,
         total_processes: int,
         devices: Optional[List[int]] = None,
         seeds: Optional[List[int]] = None,
         deterministic_cudnn: bool = False,
     ) -> Dict[str, Any]:
         res = self._get_sampler_args_for_scene_split(
             os.path.join(self.TRAIN_DATASET_DIR, "episodes"),
             process_ind,
             total_processes,
             seeds=seeds,
             deterministic_cudnn=deterministic_cudnn,
         )
         res["scene_directory"] = self.TRAIN_DATASET_DIR
         res["loop_dataset"] = True
         res["env_args"] = {}
         res["env_args"].update(self.ENV_ARGS)
         res["env_args"]["x_display"] = (
             ("0.%d" % devices[process_ind % len(devices)])
             if devices is not None and len(devices) > 0
             else None
         )
         res["allow_flipping"] = True
         return res
     def valid_task_sampler_args(
         self,
         process_ind: int,
         total_processes: int,
         devices: Optional[List[int]] = None,
         seeds: Optional[List[int]] = None,
         deterministic_cudnn: bool = False,
     ) -> Dict[str, Any]:
         res = self._get_sampler_args_for_scene_split(
             os.path.join(self.VAL_DATASET_DIR, "episodes"),
             process_ind,
             total_processes,
             seeds=seeds,
             deterministic_cudnn=deterministic_cudnn,
         )
         res["scene_directory"] = self.VAL_DATASET_DIR
         res["loop_dataset"] = False
         res["env_args"] = {}
         res["env_args"].update(self.ENV_ARGS)
         res["env_args"]["x_display"] = (
             ("0.%d" % devices[process_ind % len(devices)])
             if devices is not None and len(devices) > 0
             else None
         )
         return res
     def test_task_sampler_args(
         self,
         process_ind: int,
         total_processes: int,
         devices: Optional[List[int]] = None,
         seeds: Optional[List[int]] = None,
         deterministic_cudnn: bool = False,
     ) -> Dict[str, Any]:
         res = self._get_sampler_args_for_scene_split(
             os.path.join(self.VAL_DATASET_DIR, "episodes"),
             process_ind,
             total_processes,
             seeds=seeds,
             deterministic_cudnn=deterministic_cudnn,
         )
         res["scene_directory"] = self.VAL_DATASET_DIR
         res["loop_dataset"] = False
         res["env_args"] = {}
         res["env_args"].update(self.ENV_ARGS)
         return res 

Model training

python main.py -o <PATH_TO_OUTPUT> -c -b <BASE_DIRECTORY_OF_YOUR_EXPERIMENT> <EXPERIMENT_NAME>

NOTE: Training the entire dataset takes nearly 2 days on a machine with 8 GPU!

             However, you can train on a smaller dataset simply by changing the dataset’s 

             path in the above line of code.

Model testing

Visit this page for the steps to test a pre-trained AllenAct model.

Source: https://allenact.org/tutorials/training-a-pointnav-model

End note

In this article, we gave an overview of a state-of-the-art framework related to the Embodied AI domain viz. AllenAct. We also gave a demonstration of implementing PointNav task using the modular project. 

To get a deeper understanding of the AllenAct project, refer to the following sources:

What Do You Think?

Join Our Telegram Group. Be part of an engaging online community. Join Here.

Subscribe to our Newsletter

Get the latest updates and relevant offers by sharing your email.

Copyright Analytics India Magazine Pvt Ltd

Scroll To Top