Observation

Click and Open In Colab

_images/observation_demo.png

MetaDrive provides various kinds of sensory input, as illustrated in the next figure. For low-level sensors, RGB cameras, depth cameras, semantic camera, instance camera and Lidar can be placed anywhere in the scene with adjustable parameters such as view field and the laser number. Meanwhile, the high-level scene information including the road information and nearby vehicles’ information like velocity and heading can also be provided as the observation.

Note that MetaDrive aims at providing an efficient platform to benchmark RL research, therefore we improve the simulation efficiency at the cost of photorealistic rendering effect.

In this page, we describe the optional observation forms in current MetaDrive version and discuss how to implement new forms of observation subject to your own tasks. There are three kinds of observations we usually used for training agents:

  • LidarStateObservation

  • ImageStateObservation

  • TopDownObservation

By default, the observation is LidarStateObservation.

LidarStateObservation

This observation returns a state vector containing necessary information for navigation tasks. We use this state vector in almost all existing RL experiments such as the Generalization, MARL and Safe RL experiments. The state vector consist of three parts:

  1. Ego State: current states such as the steering, heading, velocity and relative distance to boundaries, implemented in the vehicle_state function of StateObservation. Please find the detailed meaning of each state dimension in the code.

  2. Navigation: the navigation information that guides the vehicle toward the destination. Concretely, MetaDrive first computes the route from the spawn point to the destination of the ego vehicle. Then a set of checkpoints are scattered across the whole route with certain intervals. When collecting navigation information, the next two or more checkpoints will be selected and their positions will be projected to the ego vehicle’s local coordinates. The final navigation observation consists of these position vectors and other information, i.e. the direction and curvature of the road. This part is implemented in the _get_info_for_checkpoint function of Navigation Class. Note: there are several types of navigation system can be used. Please reference Navigation section for more details.

  3. Surrounding: the surrounding information is encoded by a vector containing the Lidar-like cloud points. The data is generated by the Lidar Class. We typically use 240 lasers (single-agent) and 70 lasers (multi-agent) to scan the neighboring area with radius 50 meters. Besides, it is optional to include the information of perceived vehicles such as their positions and headings represented in the ego car’s local coordinates.

The above information is normalized to [0,1] and concatenated into a state vector by the LidarStateObservation Class and fed to the RL agents. Now let’s dive into the observation class and get you familiar with the state information collection.

ImageStateObservation

_images/rgb_obs.png _images/depth_obs.jpg

MetaDrive supports visuomotor tasks by rendering 3D scenes during the training. The above figure shows the images captured by RGB camera (left) and depth camera (right). By using ImageStateObservation, the image data will be returned with Ego State and Navigation for making the driving decision. In this section, we discuss how to utilize this observation.

Before using such function in your project, please make sure the offscreen rendering is working in your machine. The setup tutorial is at Install MetaDrive with headless rendering. There are two official examples using RGB camera as observation:

python -m metadrive.examples.drive_in_single_agent_env --observation rgb_camera

# options for `--camera`: rgb main semantic depth
python -m metadrive.examples.verify_image_observation --camera rgb 

Concretely, to setup the vision-based observation, there are three steps:

  • Step 1. Set the config["image_observation"] = True to tell MetaDrive maintaining a image buffer in memory even no popup window exists.

  • Step 2. Notify the simulator what kind of sensor you are going to create in the config. A unique name should be assigned for this sensor. The image size (width and height) will be determined by the camera parameters.

  • Step 3. Set the config["vehicle_config"]["image_source"] to the sensor name. Here it should be rgb_camera.

An example creating RGB camera is as follows. It creates an RGBCamera with resolution=(width, height)=(128, 64). We then tell the engine to launch the image observation and retrieve the image from sensor whose id name rgb_camera. The image in the step observation dict o actually contains the images of latest several steps. Here we visualize the last image, which is the current frame. How many latest images are included in observation can be specified with config stack_size, which is 3 by default.

from metadrive.envs.metadrive_env import MetaDriveEnv
from metadrive.component.sensors.rgb_camera import RGBCamera
import cv2
from metadrive.policy.idm_policy import IDMPolicy
from IPython.display import Image
from metadrive.utils import generate_gif
import numpy as np
import os
sensor_size = (84, 60) if os.getenv('TEST_DOC') else (200, 100)

cfg=dict(image_observation=True, 
         vehicle_config=dict(image_source="rgb_camera"),
         sensors={"rgb_camera": (RGBCamera, *sensor_size)},
         stack_size=3,
         agent_policy=IDMPolicy # drive with IDM policy
        )

env=MetaDriveEnv(cfg)
frames = []
try:
    env.reset()
    for _ in range(1 if os.getenv('TEST_DOC') else 10000):
        # simulation
        o, r, d, _, _ = env.step([0,1])
        # rendering, the last one is the current frame
        ret=o["image"][..., -1]*255 # [0., 1.] to [0, 255]
        ret=ret.astype(np.uint8)
        frames.append(ret[..., ::-1])
        if d:
            break
    generate_gif(frames if os.getenv('TEST_DOC') else frames[-300:-50])
finally:
    env.close()
Image(open("demo.gif", 'rb').read(), width=512, height=256)
_images/8116d3c1b28c68d859b65d336b3f6bdb84abdd4d2474561eeec81f02e1fbf7b5.png

Besides the newly created RGBCamera, there is actually an existing one moving with the car, providing a third person perspective. It is created automatically when launching the rendering service, and captures and renders the content to the main window. Thus the buffer-size for this camera depends on window_size. Therefore, you can just use this camera rather than creating a new one. Just set the image_source=main_camera and specify the image size with window_size. Note that we set norm_pixel=True and thus the image pixel value will be unit8 and range from 0 to 255.

cfg=dict(image_observation=True, 
         vehicle_config=dict(image_source="main_camera"),
         norm_pixel=False,
         agent_policy=IDMPolicy, # drive with IDM policy
         show_terrain = not os.getenv('TEST_DOC'), # test
         window_size=(84, 60) if os.getenv('TEST_DOC') else (200, 100))

env=MetaDriveEnv(cfg)
frames = []
try:
    env.reset()
    for _ in range(1 if os.getenv('TEST_DOC') else 10000):
        # simulation
        o, r, d, _, _ = env.step([0,1])
        # rendering, the last one is the current frame
        ret=o["image"][..., -1]
        frames.append(ret[..., ::-1])
        if d:
            break
    generate_gif(frames if os.getenv('TEST_DOC') else frames[-300: -50])
finally:
    env.close()
Image(open("demo.gif", 'rb').read(), width=512, height=256)
_images/621867ef77c5e04f1b7c9650e94bf974e595e714e0bdedb0f759d79070389ba0.png

Rendering images and buffering the image observations consume both the GPU and CPU memory of your machine. Please be careful when using this. If you feel the visual data collection is slow, why not try our advanced offscreen render: Install MetaDrive with advanced offscreen rendering. After verifying your installation, set config["image_on_cuda"] = True to get 10x faster rollout efficiency! It will keep the rendered image on the GPU memory all the time for training, so please ensure your GPU has enough memory to store them.

More details of how to use sensors is at Sensors.

Using semantic camera as observation

from metadrive.envs import MetaDriveEnv
from metadrive.component.sensors.semantic_camera import SemanticCamera
import matplotlib.pyplot as plt
import os

size = (256, 128) if not os.getenv('TEST_DOC') else (16, 16) # for github CI

env = MetaDriveEnv(dict(
    log_level=50, # suppress log
    image_observation=True,
    show_terrain=not os.getenv('TEST_DOC'),
    sensors={"sementic_camera": [SemanticCamera, *size]},
    vehicle_config={"image_source": "sementic_camera"},
    stack_size=3,
))
obs, info = env.reset()
for _ in range(5):
    obs, r, d, t, i = env.step((0, 1))

env.close()

print({k: v.shape for k, v in obs.items()})  # Image is in shape (H, W, C, num_stacks)

plt.subplot(131)
plt.imshow(obs["image"][:, :, :, 0])
plt.subplot(132)
plt.imshow(obs["image"][:, :, :, 1])
plt.subplot(133)
plt.imshow(obs["image"][:, :, :, 2])
:device(error): Error adding inotify watch on /dev/input: No such file or directory
:device(error): Error opening directory /dev/input: No such file or directory
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[1], line 16
      6 size = (256, 128) if not os.getenv('TEST_DOC') else (16, 16) # for github CI
      8 env = MetaDriveEnv(dict(
      9     log_level=50, # suppress log
     10     image_observation=True,
   (...)
     14     stack_size=3,
     15 ))
---> 16 obs, info = env.reset()
     17 for _ in range(5):
     18     obs, r, d, t, i = env.step((0, 1))

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:523, in BaseEnv.reset(self, seed)
    521     log_level = self.config.get("log_level", logging.DEBUG if self.config.get("debug", False) else logging.INFO)
    522     set_log_level(log_level)
--> 523 self.lazy_init()  # it only works the first time when reset() is called to avoid the error when render
    524 self._reset_global_seed(seed)
    525 if self.engine is None:

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:415, in BaseEnv.lazy_init(self)
    413 if engine_initialized():
    414     return
--> 415 initialize_engine(self.config)
    416 # engine setup
    417 self.setup_engine()

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/engine_utils.py:38, in initialize_engine(env_global_config)
     35 cls = BaseEngine
     36 if cls.singleton is None:
     37     # assert cls.global_config is not None, "Set global config before initialization BaseEngine"
---> 38     cls.singleton = cls(env_global_config)
     39 else:
     40     raise PermissionError("There should be only one BaseEngine instance in one process")

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/base_engine.py:58, in BaseEngine.__init__(self, global_config)
     56 self.id_c = dict()
     57 self.try_pull_asset()
---> 58 EngineCore.__init__(self, global_config)
     59 Randomizable.__init__(self, self.global_random_seed)
     60 self.episode_step = 0

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/core/engine_core.py:299, in EngineCore.__init__(self, global_config)
    297         self.render_pipeline.daytime_mgr.time = self.global_config["daytime"]
    298 else:
--> 299     self.pbrpipe = init(
    300         msaa_samples=16,
    301         use_hardware_skinning=True,
    302         # use_normal_maps=True,
    303         use_330=False
    304     )
    306     self.sky_box = SkyBox(not self.global_config["show_skybox"])
    307     self.sky_box.attach_to_world(self.render, self.physics_world)

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:350, in init(**kwargs)
    320 def init(**kwargs):
    321     '''Initialize the PBR render pipeline
    322     :param render_node: The node to attach the shader too, defaults to `base.render` if `None`
    323     :type render_node: `panda3d.core.NodePath`
   (...)
    347     :type use_hardware_skinning: bool or None
    348     '''
--> 350     return Pipeline(**kwargs)

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:142, in Pipeline.__init__(self, render_node, window, camera_node, taskmgr, msaa_samples, max_lights, use_normal_maps, use_emission_maps, exposure, enable_fog, use_occlusion_maps, use_330, use_hardware_skinning)
    139 self._recompile_pbr()
    141 # Tonemapping
--> 142 self._setup_tonemapping()
    144 self._shader_ready = True

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:287, in Pipeline._setup_tonemapping(self)
    281 post_frag_str = _load_shader_str('tonemap.frag', defines)
    282 tonemap_shader = p3d.Shader.make(
    283     p3d.Shader.SL_GLSL,
    284     vertex=post_vert_str,
    285     fragment=post_frag_str,
    286 )
--> 287 self.tonemap_quad.set_shader(tonemap_shader)
    288 self.tonemap_quad.set_shader_input('tex', scene_tex)
    289 self.tonemap_quad.set_shader_input('exposure', self.exposure)

AttributeError: 'NoneType' object has no attribute 'set_shader'

TopDownObservation

_images/top_down_obs.png

MetaDrive also supports Top-down semantic maps. We provide a handy example to illustrate the utilization of Top-down observation in top_down_metadrive.py. You can enjoy this demo via

python -m metadrive.examples.top_down_metadrive

The following is a minimal script to use Top-down observation. The TopDownMetaDrive is a wrapper class on MetaDriveEnv which overrides observation to pygame top-down renderer. The native observation of this setting is a numpy array with shape [84, 84, 5] and all entries fall into [0, 1]. The above figure shows the semantic meaning of each channel.

from metadrive import TopDownMetaDrive

env = TopDownMetaDrive()
try:
    o,i = env.reset()
    for s in range(1, 100000):
        o, r, tm, tc, info = env.step([0, 1])
        env.render(mode="top_down")
        if tm or tc:
            break
            env.reset()
finally:
    env.close()

Customization-MultiSensor

We encourage users to design observations according to specific demand. For all environments, the observation of the agent is determined by the function env.get_single_observation, which is defined as follows:

from metadrive.utils import print_source
from metadrive.envs.metadrive_env import MetaDriveEnv
print_source(MetaDriveEnv.get_single_observation)

By default, the observation is LidarStateObservation, while image_observation=True will change it to ImageStateObservation. There are two ways to change the observation class to a customized one. The first way is creating a new environment with the get_single_observation overwritten to return your customized observation. TopDownMetaDrive is implemented in this way.

from metadrive.utils import print_source
from metadrive.envs.top_down_env import TopDownMetaDrive, TopDownSingleFrameMetaDriveEnv
print_source(TopDownSingleFrameMetaDriveEnv)
print_source(TopDownMetaDrive)

The second way is specifying the observation class in the field agent_observation of the config when creating the environment. The new observation class has to inherit from BaseObservation and implement the property observation_space (with @property decorator) and the function observe(). In the following example, we make a customized observation class which collects both vehicle states and images from three types of cameras.

from metadrive.envs.metadrive_env import MetaDriveEnv
from metadrive.component.sensors.rgb_camera import RGBCamera
from metadrive.component.sensors.semantic_camera import SemanticCamera
from metadrive.component.sensors.depth_camera import DepthCamera
import cv2
import gymnasium as gym
import numpy as np
from metadrive.policy.idm_policy import IDMPolicy
from metadrive.obs.observation_base import BaseObservation
from metadrive.obs.image_obs import ImageObservation
from metadrive.obs.state_obs import StateObservation
import os
sensor_size = (84, 60) if os.getenv('TEST_DOC') else (200, 100)

class MyObservation(BaseObservation):
    def __init__(self, config):
        super(MyObservation, self).__init__(config)
        self.rgb = ImageObservation(config, "rgb", config["norm_pixel"])
        self.depth = ImageObservation(config, "depth", config["norm_pixel"])
        self.semantic = ImageObservation(config, "semantic", config["norm_pixel"])
        self.state = StateObservation(config)

    @property
    def observation_space(self):
        os={o: getattr(self, o).observation_space for o in ["rgb", "state", "depth", "semantic"]}
        return gym.spaces.Dict(os)

    def observe(self, vehicle):
        os={o: getattr(self, o).observe() for o in ["rgb", "state", "depth", "semantic"]}
        return os

After this, we notify the engine to use this observation with the field agent_observation. Note we don’t need to specify the cfg['vehicle_config']['image_source'], as it is used in ImageStateObservation which is already overwritten with our customized observation class. In addition, you have to request corresponding sensors so the observation class can find these sensors from the engine. Actually, you are allowed to create any number of sensors according to your demand. All available sensors can be found at Sensors. At last, the config should look like:

cfg=dict(agent_policy=IDMPolicy, # drive with IDM policy
         agent_observation=MyObservation,
         image_observation=True,
         sensors={"rgb": (RGBCamera, *sensor_size),
                  "depth": (DepthCamera, *sensor_size),
                  "semantic": (SemanticCamera, *sensor_size)},
         log_level=50) # turn off log

Once finishing the setup, the following code will launch the simulation and show you the image observations returned by env.step(), which should contain images from three cameras.

from metadrive.utils import generate_gif
from IPython.display import Image

frames = []
env=MetaDriveEnv(cfg)
try:
    env.reset()
    print("Observation shape: \n", env.observation_space)
    for step in range(1 if os.getenv('TEST_DOC') else 1000):
        o, r, d, _, _ = env.step([0,1]) # simulation
        
        # visualize image observation
        o_1 = o["depth"][..., -1]
        o_1 = np.concatenate([o_1, o_1, o_1], axis=-1) # align channel
        o_2 = o["rgb"][..., -1]
        o_3 = o["semantic"][..., -1]
        ret = cv2.hconcat([o_1, o_2, o_3])*255
        ret=ret.astype(np.uint8)
        frames.append(ret[..., ::-1])
        if d:
            break
    generate_gif(frames if os.getenv('TEST_DOC') else frames[-300:-50]) # only show 250 frames
finally:
    env.close()
Observation shape: 
 Dict('depth': Box(-0.0, 1.0, (100, 200, 1, 3), float32), 'rgb': Box(-0.0, 1.0, (100, 200, 3, 3), float32), 'semantic': Box(-0.0, 1.0, (100, 200, 3, 3), float32), 'state': Box(-0.0, 1.0, (19,), float32))
Image(open("demo.gif", 'rb').read(), width=256*3, height=128)
_images/e05f3ade0fef51a302e1436235f09472fb4d0742d77e406aabb3802a03faaae5.png

Customization-MultiView

Usually, you don’t need to create a certain type of image sensor multiple times and maintain many instances in the game engine. Instead, you can create just one sensor but mount it to different positions and poses to collect multiple rendering results. In this way, the rendering can be more efficient.

In the following example, we want 4 RGB cameras to monitor the traffic density of 4 entries of an intersection. In practice, we don’t create any new RGB cameras but use the main RGB camera. In every frame, we move it to 4 target positions to collect data. The result is the same as capturing images with 4 different RGB cameras, which harms the simulation’s performance.

More sensor examples can be found in Sensors.

from metadrive.envs.metadrive_env import MetaDriveEnv
import cv2
import gymnasium as gym
import numpy as np
from metadrive.obs.observation_base import BaseObservation
from metadrive.obs.image_obs import ImageObservation
import os
from metadrive.utils import generate_gif
from IPython.display import Image
sensor_size = (1, 1) if os.getenv('TEST_DOC') else (200, 200)
class MyObservation(BaseObservation):
    def __init__(self, config):
        super(MyObservation, self).__init__(config)
        self.rgb = ImageObservation(config, "main_camera", config["norm_pixel"])

    @property
    def observation_space(self):
        os = {"entry_{}".format(idx): self.rgb.observation_space for idx in range(4)}
        os["top_down"] = self.rgb.observation_space
        return gym.spaces.Dict(os)

    def observe(self, vehicle):
        ret = {}
        # The first rendered image is the top-down view
        ret["top_down"] = self.rgb.observe()
        # The camera can be borrowed to render new images with new poses
        for idx in range(4):
            ret["entry_{}".format(idx)] = self.rgb.observe(self.engine.origin,
                                                           position=[70, 8.75, 8],
                                                           hpr=[idx * 90, -15, 0])
        return ret


env_cfg = dict(agent_observation=MyObservation,
               image_observation=True,
               window_size=sensor_size,
               map="X",
               show_terrain=not os.getenv('TEST_DOC'),
               traffic_density=0.2,
               show_interface=False,
               show_fps=False,
               traffic_mode="respawn",
               log_level=50,  # no log message
               vehicle_config=dict(image_source="main_camera"))


def reset_sensors(self):
    """
    Put the main camera to the center of the intersection at the start of each episode
    """
    self.main_camera.stop_track()
    self.main_camera.set_bird_view_pos([70, 8.75])
    self.main_camera.top_down_camera_height = 50


MetaDriveEnv.reset_sensors = reset_sensors

frames = []
env = MetaDriveEnv(env_cfg)
try:
    env.reset()
    print("Observation shape: \n", env.observation_space)
    for step in range(1 if os.getenv('TEST_DOC') else 500):
        o, r, d, _, _ = env.step([0, -1])  # simulation

        # visualize image observation
        o_1 = o["entry_0"][..., -1]
        o_2 = o["entry_1"][..., -1]
        o_3 = o["entry_2"][..., -1]
        o_4 = o["entry_3"][..., -1]
        o_5 = o["top_down"][..., -1]
        ret = cv2.hconcat([o_1, o_2, o_3, o_4, o_5]) * 255
        ret = ret.astype(np.uint8)
        frames.append(ret[::2, ::2, ::-1])
    generate_gif(frames if os.getenv('TEST_DOC') else frames[-100:])  # only show -100 frames
finally:
    env.close()
Observation shape: 
 Dict('entry_0': Box(-0.0, 1.0, (200, 200, 3, 3), float32), 'entry_1': Box(-0.0, 1.0, (200, 200, 3, 3), float32), 'entry_2': Box(-0.0, 1.0, (200, 200, 3, 3), float32), 'entry_3': Box(-0.0, 1.0, (200, 200, 3, 3), float32), 'top_down': Box(-0.0, 1.0, (200, 200, 3, 3), float32))
Image(open("demo.gif", 'rb').read(), width=100*5, height=100)
_images/ff4f9bbbaac8f6f9151f7ec5b6dd775d69063cbb97432de8621a3424e6bcb650.png

FAQ

Can I use LidarState observation but also render the images at the same time?

Yes! You can stick to the original observation by passing config["agent_observation"]=LidarStateObservation but still maintaining the RGB camera with config["sensors"]=dict(rgb_camera=(RGBCamera, ...)). See this example:

from metadrive.envs.metadrive_env import MetaDriveEnv
from metadrive.obs.state_obs import LidarStateObservation
from metadrive.component.sensors.rgb_camera import RGBCamera
import os
test_doc = os.getenv('TEST_DOC')
sensor_size = (84, 60) if test_doc else (200, 100)

env = MetaDriveEnv(config=dict(
    use_render=False,
    agent_observation=LidarStateObservation,
    image_observation=True,
    norm_pixel=False,
    sensors=dict(rgb_camera=(RGBCamera, *sensor_size)),
))

obs, info = env.reset()

print("Observation shape: ", obs.shape)

image = env.engine.get_sensor("rgb_camera").perceive(to_float=False)
image = image[..., [2, 1, 0]]

if not test_doc:
    import matplotlib.pyplot as plt
    plt.imshow(image)
    plt.show()
[INFO] Environment: MetaDriveEnv
[INFO] MetaDrive version: 0.4.2.3
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector(), rgb_camera: RGBCamera(512, 256)]
[INFO] Render Mode: offscreen
[INFO] Horizon (Max steps per agent): None
[WARNING] You have set norm_pixel = False, which means the observation will be uint8 values in [0, 255]. Please make sure you have parsed them later before feeding them to network! (metadrive_env.py:113)
[INFO] Assets version: 0.4.2.3
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Assets version: 0.4.2.3
[INFO] Known Pipes: glxGraphicsPipe
[WARNING] You are using too large buffer! The height is 256, and width is 512. It may lower the sample efficiency! Consider reducing buffer size or use cuda image by set [image_on_cuda=True]. (base_camera.py:49)
[INFO] Start Scenario Index: 0, Num Scenarios : 1
Observation shape:  (259,)
_images/a66dca16f10995cf45eff68b2f623d10269438b3a489516656706ac88118dc0b.png