Build New Env
This section will walk through how we build new environments or simulations from scratch. We do the development on both Windows (10/11) and Linux (Ubuntu), using Pycharm. Let’s get familiar with the project first
Project Structure
The project structure is as follows:
bridges: where the ros bridge, sumo bridge and other co-simulation modules are located
documentation: It contains files building this documentation
metadrive: the metadrive package, most content is in this directory
The metadrive
package has the following structure:
assets: it is downloaded from github releases automatically, containing models, textures, and mini-batch real-world scenarios
base_class: meta-classes for making other python classes, usually you don’t need to modify it
component: objects/vehicles/maps/lanes/building, almost all elements used for constructing scenario is here
engine: it contains code regarding the simulation loop, top-down renderer, asset manager/loader, physics system, logger, skybox, shadow, and so on
envs: a collection of gym-style environments
examples: runnable scripts for making yourself familiar with MetaDrive
manager: managers that defines how to create/forward scene are stored here
obs: a collection of observations that define how to collect information from the simulator in each step
policy: a collection of policies that define how an object should act in each step
render_pipeline: it contains the deferred rendering pipeline, which is developed by tobspr
scenario: it defines the universal scenario format and a set of tools to read data from the format, like parsing object state
shaders: .glsl shaders for making skybox, terrain, depth camera and so on
tests: unitest are located here, which can be viewed as examples as well
third_party: libraries developed by others
utils: various tool functions
constants.py: it defines constants and some properties used everywhere
type.py: all objects have a type label, which is selected from here
pull_asset: scripts to pull or update asset from the remote git release page
Start point-BaseEnv
To start making your own environment, the first step is to have something runnable so you can build things on top of it.
This can be done with BaseEnv
, which is an empty environment with only a vehicle placed in this environment.
So just make a new your_env.py
file and put the following code into it.
Note: we usually use 3D renderer to do development as it shows more details, but using 2D visualizer is allowed as well. In this doc, we use 2D renderer for convinience.
If you have a screen with OpenGL support, you can completely remove the env.render
and env.top_down_renderer.generate_gif
. If OpenGL is not supported on your machine but a screen is still available, just turn off the screen_record
and set window=True
for top_down_renderer and remove env.top_down_renderer.generate_gif
. Otherwise, just keep everying unchanged. Without a screen, the only way to visualize the environment is through generating GIFs.
from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging
class MyEnv(BaseEnv):
def reward_function(self, agent):
return 0, {}
def cost_function(self, agent):
return 0, {}
def done_function(self, agent):
return False, {}
def get_single_observation(self):
return DummyObservation()
if __name__=="__main__":
# create env
env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering
manual_control=True, # we usually manually control the car to test environment
log_level=logging.CRITICAL)) # suppress logging message
env.reset()
for i in range(20):
# step
obs, reward, termination, truncate, info = env.step(env.action_space.sample())
# you can set window=True and remove generate_gif() if you have a screen.
# Or just use 3D rendering and remove all stuff related to env.render()
frame=env.render(mode="topdown",
window=False, # turn me on, if you have screen
screen_record=True, # turn me off, if a window can be poped up
screen_size=(200, 200))
env.top_down_renderer.generate_gif()
env.close()
from IPython.display import Image
Image(open("demo.gif", 'rb').read())
error: XDG_RUNTIME_DIR not set in the environment.
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[1], line 25
20 if __name__=="__main__":
21 # create env
22 env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering
23 manual_control=True, # we usually manually control the car to test environment
24 log_level=logging.CRITICAL)) # suppress logging message
---> 25 env.reset()
26 for i in range(20):
27
28 # step
29 obs, reward, termination, truncate, info = env.step(env.action_space.sample())
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:542, in BaseEnv.reset(self, seed)
539 assert (len(self.agents) == self.num_agents) or (self.num_agents == -1), \
540 "Agents: {} != Num_agents: {}".format(len(self.agents), self.num_agents)
541 assert self.config is self.engine.global_config is get_global_config(), "Inconsistent config may bring errors!"
--> 542 return self._get_reset_return(reset_info)
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:568, in BaseEnv._get_reset_return(self, reset_info)
565 def _get_reset_return(self, reset_info):
566 # TODO: figure out how to get the information of the before step
567 scene_manager_before_step_infos = reset_info
--> 568 scene_manager_after_step_infos = self.engine.after_step()
570 obses = {}
571 done_infos = {}
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/base_engine.py:462, in BaseEngine.after_step(self, *args, **kwargs)
460 assert list(self.managers.keys())[-1] == "record_manager", "Record Manager should have lowest priority"
461 for manager in self.managers.values():
--> 462 new_step_info = manager.after_step(*args, **kwargs)
463 step_infos = concat_step_infos([step_infos, new_step_info])
464 self.interface.after_step()
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:309, in BaseAgentManager.after_step(self, *args, **kwargs)
307 def after_step(self, *args, **kwargs):
308 step_infos = self.try_actuate_agent({}, stage="after_step")
--> 309 step_infos.update(self.for_each_active_agents(lambda v: v.after_step()))
310 return step_infos
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:402, in BaseAgentManager.for_each_active_agents(self, func, *args, **kwargs)
400 ret = dict()
401 for k, v in self.active_agents.items():
--> 402 ret[k] = func(v, *args, **kwargs)
403 return ret
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/manager/base_manager.py:309, in BaseAgentManager.after_step.<locals>.<lambda>(v)
307 def after_step(self, *args, **kwargs):
308 step_infos = self.try_actuate_agent({}, stage="after_step")
--> 309 step_infos.update(self.for_each_active_agents(lambda v: v.after_step()))
310 return step_infos
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/component/vehicle/base_vehicle.py:254, in BaseVehicle.after_step(self)
242 my_policy = self.engine.get_policy(self.name)
243 step_info.update(
244 {
245 "velocity": float(self.speed),
(...)
251 }
252 )
--> 254 lanes_heading = self.navigation.navi_arrow_dir
255 lane_0_heading = lanes_heading[0]
256 lane_1_heading = lanes_heading[1]
AttributeError: 'NoneType' object has no attribute 'navi_arrow_dir'
As shown in the figure above, this environment contains nothing. We will make it complete gradually.
Firstly, let’s add a map to the environment to make it a bit interesting.
Also, when we develop a new environment, developing map and map management module is the first step we do. This is because map is the most important component associated with a lot of things like observation and navigation. Thus, the map has to be created in a scene firstly and the map management module map_manager
should have the highest priority.
MapManager
Compared to the last code cell, we add a map manager to generate different maps for each seed. Concretely, there are three maps and we choose from the three maps according to the global seed by map_id = global_seed % 3
. New content added upon the last code cell are marked between two ======
.
from metadrive.envs import BaseEnv
from metadrive.obs.observation_base import DummyObservation
import logging
# ======================================== new content ===============================================
import cv2
from metadrive.component.map.pg_map import PGMap
from metadrive.manager.base_manager import BaseManager
from metadrive.component.pgblock.first_block import FirstPGBlock
class MyMapManager(BaseManager):
PRIORITY = 0
def __init__(self):
super(MyMapManager, self).__init__()
self.current_map = None
self.all_maps = {idx: None for idx in range(3)} # store the created map
self._map_shape = ["X", "T", "O"] # three types of maps
def reset(self):
idx = self.engine.global_random_seed % 3
if self.all_maps[idx] is None:
# create maps on the fly
new_map = PGMap(map_config=dict(type=PGMap.BLOCK_SEQUENCE,
config=self._map_shape[idx]))
self.all_maps[idx] = new_map
# attach map in the world
map = self.all_maps[idx]
map.attach_to_world()
self.current_map = map
return dict(current_map=self._map_shape[idx])
def before_reset(self):
if self.current_map is not None:
self.current_map.detach_from_world()
self.current_map = None
def destroy(self):
# clear all maps when this manager is destroyed
super(MyMapManager, self).destroy()
for map in self.all_maps.values():
if map is not None:
map.destroy()
self.all_maps = None
# Expand the default config system, specify where to spawn the car
MY_CONFIG = dict(agent_configs={"default_agent": dict(spawn_lane_index=(FirstPGBlock.NODE_1, FirstPGBlock.NODE_2, 0))})
class MyEnv(BaseEnv):
@classmethod
def default_config(cls):
config = super(MyEnv, cls).default_config()
config.update(MY_CONFIG)
return config
def setup_engine(self):
super(MyEnv, self).setup_engine()
self.engine.register_manager("map_manager", MyMapManager())
# ======================================== new content ===============================================
def reward_function(self, agent):
return 0, {}
def cost_function(self, agent):
return 0, {}
def done_function(self, agent):
return False, {}
def get_single_observation(self):
return DummyObservation()
if __name__=="__main__":
frames = []
# create env
env=MyEnv(dict(use_render=False, # if you have a screen and OpenGL suppor, you can set use_render=True to use 3D rendering
manual_control=True, # we usually manually control the car to test environment
num_scenarios=4,
log_level=logging.CRITICAL)) # suppress logging message
for i in range(4):
# reset
o, info = env.reset(seed=i)
print("Load map with shape: {}".format(info["current_map"]))
# you can set window=True and remove generate_gif() if you have a screen.
# Or just use 3D rendering and remove all stuff related to env.render()
frame=env.render(mode="topdown",
window=False, # turn me on, if you have screen
scaling=3,
camera_position=(50, 0),
screen_size=(400, 400))
frames.append(frame)
cv2.imwrite("demo.png", cv2.cvtColor(cv2.hconcat(frames), cv2.COLOR_RGB2BGR))
env.close()
from IPython.display import Image
Image(open("demo.png", 'rb').read())
The results show 4 scenarios and the 4th scenario is the same as the first one, as we repeatedly load the 3 maps according to MyMapManager.reset()
.
This function also returns the shape of the current map, which you can access in the info
returned by env.step()
.
Thus, in your own development, you can collect simulation information and return them by env.step()
by returning a dictionary in these functions: before_step
, step
, after_step
, before_reset
, reset
, after_reset
.
Another thing to take care of is that we overwrite the original agent_configs
of BaseEnv
.
It defines where to spawn the agent with the id default_agent
.
You can create maps automatically with PGMap
which combines some predefined blocks. Also, you can use the more general map API ScenarioMap
which takes a dict defining lane centerline, lane lines (solid/dash), sidewalks, and crosswalks as input. More information about creating maps is at maps.
AgentManager
Action Space
Observation Space
The target you are interested in and wanna control
Other Managers
Development tips
Remember to call RemoveNode of all NodePath!
attachNewNode will also create NodePath so they should be destroyed too!