Build New Env

Click and Open In Colab

This section will walk through how we build new environments or simulations from scratch. We do the development on both Windows (10/11) and Linux (Ubuntu), using Pycharm. Let’s get familiar with the project first

Project Structure

The project structure is as follows:

  • bridges: where the ros bridge, sumo bridge and other co-simulation modules are located

  • documentation: It contains files building this documentation

  • metadrive: the metadrive package, most content is in this directory

The metadrive package has the following structure:

  • assets: it is downloaded from github releases automatically, containing models, textures, and mini-batch real-world scenarios

  • base_class: meta-classes for making other python classes, usually you don’t need to modify it

  • component: objects/vehicles/maps/lanes/building, almost all elements used for constructing scenario is here

  • engine: it contains code regarding the simulation loop, top-down renderer, asset manager/loader, physics system, logger, skybox, shadow, and so on

  • envs: a collection of gym-style environments

  • examples: runnable scripts for making yourself familiar with MetaDrive

  • manager: managers that defines how to create/forward scene are stored here

  • obs: a collection of observations that define how to collect information from the simulator in each step

  • policy: a collection of policies that define how an object should act in each step

  • render_pipeline: it contains the deferred rendering pipeline, which is developed by tobspr

  • scenario: it defines the universal scenario format and a set of tools to read data from the format, like parsing object state

  • shaders: .glsl shaders for making skybox, terrain, depth camera and so on

  • tests: unitest are located here, which can be viewed as examples as well

  • third_party: libraries developed by others

  • utils: various tool functions

  • constants.py: it defines constants and some properties used everywhere

  • type.py: all objects have a type label, which is selected from here

  • pull_asset: scripts to pull or update asset from the remote git release page

Start point-BaseEnv

To start making your own environment, the first step is to have something runnable so you can build things on top of it. This can be done with BaseEnv, which is an empty environment with only a vehicle placed in this environment. So just make a new your_env.py file and put the following code into it. Note: we usually use 3D renderer to do development as it shows more details, but using 2D visualizer is allowed as well. In this doc, we use 2D renderer for convinience. If you have a screen with OpenGL support, you can completely remove the env.render and env.top_down_renderer.generate_gif. If OpenGL is not supported on your machine but a screen is still available, just turn off the screen_record and set window=True for top_down_renderer and remove env.top_down_renderer.generate_gif. Otherwise, just keep everying unchanged. Without a screen, the only way to visualize the environment is through generating GIFs.

from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging

class MyEnv(BaseEnv):

    def reward_function(self, agent):
        return 0, {}

    def cost_function(self, agent):
        return 0, {}

    def done_function(self, agent):
        return False, {}
    
    def get_single_observation(self):
        return DummyObservation()
        

if __name__=="__main__":
    # create env
    env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering  
                   manual_control=True, # we usually manually control the car to test environment
                   log_level=logging.CRITICAL)) # suppress logging message
    env.reset()
    for i in range(20):
        
        # step
        obs, reward, termination, truncate, info = env.step(env.action_space.sample())
        
        # you can set window=True and remove generate_gif() if you have a screen. 
        # Or just use 3D rendering and remove all stuff related to env.render()  
        frame=env.render(mode="topdown", 
                         window=False, # turn me on, if you have screen
                         screen_record=True, # turn me off, if a window can be poped up
                         screen_size=(200, 200))
    env.top_down_renderer.generate_gif()
    env.close()

from IPython.display import Image
Image(open("demo.gif", 'rb').read())
error: XDG_RUNTIME_DIR not set in the environment.
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[1], line 25
     20 if __name__=="__main__":
     21     # create env
     22     env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering  
     23                    manual_control=True, # we usually manually control the car to test environment
     24                    log_level=logging.CRITICAL)) # suppress logging message
---> 25     env.reset()
     26     for i in range(20):
     27         
     28         # step
     29         obs, reward, termination, truncate, info = env.step(env.action_space.sample())

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:542, in BaseEnv.reset(self, seed)
    539 assert (len(self.agents) == self.num_agents) or (self.num_agents == -1), \
    540     "Agents: {} != Num_agents: {}".format(len(self.agents), self.num_agents)
    541 assert self.config is self.engine.global_config is get_global_config(), "Inconsistent config may bring errors!"
--> 542 return self._get_reset_return(reset_info)

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:568, in BaseEnv._get_reset_return(self, reset_info)
    565 def _get_reset_return(self, reset_info):
    566     # TODO: figure out how to get the information of the before step
    567     scene_manager_before_step_infos = reset_info
--> 568     scene_manager_after_step_infos = self.engine.after_step()
    570     obses = {}
    571     done_infos = {}

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/base_engine.py:462, in BaseEngine.after_step(self, *args, **kwargs)
    460     assert list(self.managers.keys())[-1] == "record_manager", "Record Manager should have lowest priority"
    461 for manager in self.managers.values():
--> 462     new_step_info = manager.after_step(*args, **kwargs)
    463     step_infos = concat_step_infos([step_infos, new_step_info])
    464 self.interface.after_step()

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:309, in BaseAgentManager.after_step(self, *args, **kwargs)
    307 def after_step(self, *args, **kwargs):
    308     step_infos = self.try_actuate_agent({}, stage="after_step")
--> 309     step_infos.update(self.for_each_active_agents(lambda v: v.after_step()))
    310     return step_infos

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:402, in BaseAgentManager.for_each_active_agents(self, func, *args, **kwargs)
    400 ret = dict()
    401 for k, v in self.active_agents.items():
--> 402     ret[k] = func(v, *args, **kwargs)
    403 return ret

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:309, in BaseAgentManager.after_step.<locals>.<lambda>(v)
    307 def after_step(self, *args, **kwargs):
    308     step_infos = self.try_actuate_agent({}, stage="after_step")
--> 309     step_infos.update(self.for_each_active_agents(lambda v: v.after_step()))
    310     return step_infos

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/component/vehicle/base_vehicle.py:254, in BaseVehicle.after_step(self)
    242 my_policy = self.engine.get_policy(self.name)
    243 step_info.update(
    244     {
    245         "velocity": float(self.speed),
   (...)
    251     }
    252 )
--> 254 lanes_heading = self.navigation.navi_arrow_dir
    255 lane_0_heading = lanes_heading[0]
    256 lane_1_heading = lanes_heading[1]

AttributeError: 'NoneType' object has no attribute 'navi_arrow_dir'

As shown in the figure above, this environment contains nothing. We will make it complete gradually. Firstly, let’s add a map to the environment to make it a bit interesting. Also, when we develop a new environment, developing map and map management module is the first step we do. This is because map is the most important component associated with a lot of things like observation and navigation. Thus, the map has to be created in a scene firstly and the map management module map_manager should have the highest priority.

MapManager

Compared to the last code cell, we add a map manager to generate different maps for each seed. Concretely, there are three maps and we choose from the three maps according to the global seed by map_id = global_seed % 3. New content added upon the last code cell are marked between two ======.

from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging

# ======================================== new content ===============================================
import cv2
from metadrive.component.map.pg_map import PGMap
from metadrive.manager.base_manager import BaseManager
from metadrive.component.pgblock.first_block import FirstPGBlock

class MyMapManager(BaseManager):
    PRIORITY = 0

    def __init__(self):
        super(MyMapManager, self).__init__()
        self.current_map = None
        self.all_maps = {idx: None for idx in range(3)} # store the created map
        self._map_shape = ["X", "T", "O"] # three types of maps 

    def reset(self):
        idx = self.engine.global_random_seed % 3
        if self.all_maps[idx] is None:
            # create maps on the fly
            new_map = PGMap(map_config=dict(type=PGMap.BLOCK_SEQUENCE,
                                            config=self._map_shape[idx]))
            self.all_maps[idx] = new_map

        # attach map in the world
        map = self.all_maps[idx]
        map.attach_to_world()
        self.current_map = map
        return dict(current_map=self._map_shape[idx])

    def before_reset(self):
        if self.current_map is not None:
            self.current_map.detach_from_world()
            self.current_map = None

    def destroy(self):
        # clear all maps when this manager is destroyed
        super(MyMapManager, self).destroy()
        for map in self.all_maps.values():
            if map is not None:
                map.destroy()
        self.all_maps = None


# Expand the default config system, specify where to spawn the car
MY_CONFIG = dict(agent_configs={"default_agent": dict(spawn_lane_index=(FirstPGBlock.NODE_1, FirstPGBlock.NODE_2, 0))}) 


class MyEnv(BaseEnv):
    
    @classmethod
    def default_config(cls):
        config = super(MyEnv, cls).default_config()
        config.update(MY_CONFIG)
        return config
    
    def setup_engine(self):
        super(MyEnv, self).setup_engine()
        self.engine.register_manager("map_manager", MyMapManager())
        
# ======================================== new content ===============================================

    def reward_function(self, agent):
        return 0, {}

    def cost_function(self, agent):
        return 0, {}

    def done_function(self, agent):
        return False, {}
    
    def get_single_observation(self):
        return DummyObservation()
        

if __name__=="__main__":
    frames = []
    
    # create env
    env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering  
                   manual_control=True, # we usually manually control the car to test environment
                   num_scenarios=4,
                   log_level=logging.CRITICAL)) # suppress logging message
    for i in range(4):
        
        # reset
        o, info = env.reset(seed=i)
        print("Load map with shape: {}".format(info["current_map"]))
        # you can set window=True and remove generate_gif() if you have a screen. 
        # Or just use 3D rendering and remove all stuff related to env.render()  
        frame=env.render(mode="topdown", 
                         window=False, # turn me on, if you have screen
                         scaling=3,
                         camera_position=(50, 0),
                         screen_size=(400, 400))
        frames.append(frame)
    cv2.imwrite("demo.png",  cv2.cvtColor(cv2.hconcat(frames), cv2.COLOR_RGB2BGR))
    env.close()

from IPython.display import Image
Image(open("demo.png", 'rb').read())

The results show 4 scenarios and the 4th scenario is the same as the first one, as we repeatedly load the 3 maps according to MyMapManager.reset(). This function also returns the shape of the current map, which you can access in the info returned by env.step(). Thus, in your own development, you can collect simulation information and return them by env.step() by returning a dictionary in these functions: before_step, step, after_step, before_reset, reset, after_reset. Another thing to take care of is that we overwrite the original agent_configs of BaseEnv. It defines where to spawn the agent with the id default_agent. You can create maps automatically with PGMap which combines some predefined blocks. Also, you can use the more general map API ScenarioMap which takes a dict defining lane centerline, lane lines (solid/dash), sidewalks, and crosswalks as input. More information about creating maps is at maps.

AgentManager

Action Space

Observation Space

The target you are interested in and wanna control

Other Managers

Development tips

  1. Remember to call RemoveNode of all NodePath!

  2. attachNewNode will also create NodePath so they should be destroyed too!