Multi-sensor Rendering

Click and Open In Colab

In our SimGen project, we use MetaDrive’s ScenarioEnv to generate multiple sensor rendering, namely the Semantic Map, Depth Image and RGB Image, to condition the SimGen model.

In this example, we will demonstrate the minimal example on how to achive this in MetaDrive’s side.

The code working with SimGen can be found here.

Utilities

import time

import cv2
import gymnasium as gym
import mediapy as media
import numpy as np
import tqdm
from PIL import Image
from PIL import ImageDraw, ImageFont
from metadrive.component.sensors.depth_camera import DepthCamera
from metadrive.component.sensors.rgb_camera import RGBCamera
from metadrive.component.sensors.semantic_camera import SemanticCamera
from metadrive.engine.asset_loader import AssetLoader
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.obs.image_obs import ImageObservation
from metadrive.obs.observation_base import BaseObservation
from metadrive.policy.replay_policy import ReplayEgoCarPolicy


def postprocess_semantic_image(image):
    """
    In order to align with the Segformer's output, we modify the output color of the semantic image from MetaDrive.
    """
    # customized
    old_LANE_LINE = (255, 255, 255)
    old_CROSSWALK = (55, 176, 189)

    new_LANE_LINE = (128, 64, 128)
    new_CROSSWALK = (128, 64, 128)

    # Change the color of the lane line and crosswalk
    assert image.dtype == np.uint8

    is_lane_line = (
            (image[..., 0] == old_LANE_LINE[0]) &
            (image[..., 1] == old_LANE_LINE[1]) &
            (image[..., 2] == old_LANE_LINE[2])
    )
    image[is_lane_line] = new_LANE_LINE

    is_crosswalk = (
            (image[..., 0] == old_CROSSWALK[0]) &
            (image[..., 1] == old_CROSSWALK[1]) &
            (image[..., 2] == old_CROSSWALK[2])
    )
    image[is_crosswalk] = new_CROSSWALK

    return image

Customized Observation Class

class SimGenObservation(BaseObservation):
    def __init__(self, config):
        super(SimGenObservation, self).__init__(config)
        assert config["norm_pixel"] is False
        assert config["stack_size"] == 1
        self.seg_obs = ImageObservation(config, "seg_camera", config["norm_pixel"])
        self.rgb_obs = ImageObservation(config, "rgb_camera", config["norm_pixel"])
        self.depth_obs = ImageObservation(config, "depth_camera", config["norm_pixel"])

    @property
    def observation_space(self):
        os = dict(
            rgb=self.rgb_obs.observation_space,
            seg=self.seg_obs.observation_space,
            depth=self.depth_obs.observation_space,
        )
        return gym.spaces.Dict(os)

    def observe(self, vehicle):
        ret = {}

        seg_cam = self.engine.get_sensor("seg_camera").cam
        agent = seg_cam.getParent()
        original_position = seg_cam.getPos()
        heading, pitch, roll = seg_cam.getHpr()
        seg_img = self.seg_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert seg_img.ndim == 4
        assert seg_img.shape[-1] == 1
        assert seg_img.dtype == np.uint8
        # Do some postprocessing here
        seg_img = seg_img[..., 0]
        before = seg_img.copy()
        seg_img = postprocess_semantic_image(seg_img)
        seg_img = seg_img[..., ::-1]  # BGR -> RGB
        ret["seg"] = seg_img

        depth_cam = self.engine.get_sensor("depth_camera").cam
        agent = depth_cam.getParent()
        original_position = depth_cam.getPos()
        heading, pitch, roll = depth_cam.getHpr()
        depth_img = self.depth_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert depth_img.ndim == 4
        assert depth_img.shape[-1] == 1
        assert depth_img.dtype == np.uint8
        depth_img = depth_img[..., 0]
        # before = depth_img.copy()
        depth_img = cv2.bitwise_not(depth_img)
        depth_img = depth_img[..., None]
        ret["depth"] = depth_img

        rgb_cam = self.engine.get_sensor("rgb_camera").cam
        agent = rgb_cam.getParent()
        original_position = rgb_cam.getPos()
        heading, pitch, roll = rgb_cam.getHpr()
        rgb_img = self.rgb_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
        assert rgb_img.ndim == 4
        assert rgb_img.shape[-1] == 1
        assert rgb_img.dtype == np.uint8
        rgb_img = rgb_img[..., 0]
        # Change the color from BGR to RGB
        rgb_img = rgb_img[..., ::-1]
        ret["rgb"] = rgb_img

        return ret

Setup ScenarioEnv

# ===== MetaDrive Setup =====
import os
sensor_size = (16, 16) if os.getenv('TEST_DOC') else (800, 450)

env = ScenarioEnv(
    {
        'agent_observation': SimGenObservation,

        # To enable onscreen rendering, set this config to True.
        "use_render": False,

        # !!!!! To enable offscreen rendering, set this config to True !!!!!
        "image_observation": True,

        "norm_pixel": False,
        "stack_size": 1,

        # ===== The scenario and MetaDrive config =====
        "agent_policy": ReplayEgoCarPolicy,
        "no_traffic": False,
        "sequential_seed": True,
        "reactive_traffic": False,
        "num_scenarios": 9,
        "horizon": 1000,
        "no_static_vehicles": False,
        "agent_configs": {
            "default_agent": dict(use_special_color=True, vehicle_model="varying_dynamics_bounding_box")
        },
        "vehicle_config": dict(
            show_navi_mark=False,
            show_line_to_dest=False,
            lidar=dict(num_lasers=120, distance=50),
            lane_line_detector=dict(num_lasers=0, distance=50),
            side_detector=dict(num_lasers=12, distance=50),
        ),
        # "use_bounding_box": True,
        "data_directory": AssetLoader.file_path("nuscenes", unix_style=False),
        "height_scale": 1,

        "set_static": True,

        # ===== Set some sensor and visualization configs =====
        "daytime": "08:10",
        "window_size": (sensor_size[0], sensor_size[1]),
        "camera_dist": 0.8,  # 0.8, 1.71
        "camera_height": 1.5,  # 1.5
        "camera_pitch": None,
        "camera_fov": 66,  # 60, 66
        "sensors": dict(
            depth_camera=(DepthCamera, sensor_size[0], sensor_size[1]),
            rgb_camera=(RGBCamera, sensor_size[0], sensor_size[1]),
            seg_camera=(SemanticCamera,sensor_size[0], sensor_size[1]),
        ),

        # ===== Remove useless items in the images =====
        "show_logo": False,
        "show_fps": False,
        "show_interface": True,
        "disable_collision": True,
        "force_destroy": True,
    }
)
[INFO] Environment: ScenarioEnv
[INFO] MetaDrive version: 0.4.3
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector(), depth_camera: DepthCamera(800, 450), rgb_camera: RGBCamera(800, 450), seg_camera: SemanticCamera(800, 450)]
[INFO] Render Mode: offscreen
[INFO] Horizon (Max steps per agent): 1000

Rollout

if not os.getenv('TEST_DOC'):
    skip_steps = 1
    fps = 10
    
    frames = []
    
    env.reset()
    scenario = env.engine.data_manager.current_scenario
    scenario_id = scenario['id']
    print(
        "Current scenario ID {}, dataset version {}, len: {}".format(
            scenario_id, scenario['version'], scenario['length']
        )
    )
    horizon = scenario['length']
    for t in tqdm.trange(horizon):
        o, r, d, _, _ = env.step([1, 0.88])
        if t % skip_steps == 0:
            depth_img = Image.fromarray(o["depth"].repeat(3, axis=-1), mode="RGB")
            seg_img = Image.fromarray(o["seg"], mode="RGB")
            rgb_img = Image.fromarray(o["rgb"], mode="RGB")
    
            vis = cv2.hconcat([o["seg"], o["depth"].repeat(3, axis=-1)])
            h, w, _ = o["rgb"].shape
            vis_w = vis.shape[1]
            image = cv2.resize(o["rgb"], (vis_w, int(h * vis_w / w)))
            vis = cv2.vconcat([vis, image])
    
            # Quick visualization:
            # import matplotlib.pyplot as plt;plt.imshow(vis);plt.show()
            frames.append(vis)
        env.close()
    
    media.show_video(frames, fps=fps, width=600)
[INFO] Assets version: 0.4.3
:device(error): Error adding inotify watch on /dev/input: No such file or directory
:device(error): Error opening directory /dev/input: No such file or directory
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Assets version: 0.4.3
[INFO] Known Pipes: glxGraphicsPipe
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], line 7
      3 fps = 10
      5 frames = []
----> 7 env.reset()
      8 scenario = env.engine.data_manager.current_scenario
      9 scenario_id = scenario['id']

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:523, in BaseEnv.reset(self, seed)
    521     log_level = self.config.get("log_level", logging.DEBUG if self.config.get("debug", False) else logging.INFO)
    522     set_log_level(log_level)
--> 523 self.lazy_init()  # it only works the first time when reset() is called to avoid the error when render
    524 self._reset_global_seed(seed)
    525 if self.engine is None:

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:415, in BaseEnv.lazy_init(self)
    413 if engine_initialized():
    414     return
--> 415 initialize_engine(self.config)
    416 # engine setup
    417 self.setup_engine()

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/engine_utils.py:38, in initialize_engine(env_global_config)
     35 cls = BaseEngine
     36 if cls.singleton is None:
     37     # assert cls.global_config is not None, "Set global config before initialization BaseEngine"
---> 38     cls.singleton = cls(env_global_config)
     39 else:
     40     raise PermissionError("There should be only one BaseEngine instance in one process")

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/base_engine.py:58, in BaseEngine.__init__(self, global_config)
     56 self.id_c = dict()
     57 self.try_pull_asset()
---> 58 EngineCore.__init__(self, global_config)
     59 Randomizable.__init__(self, self.global_random_seed)
     60 self.episode_step = 0

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/core/engine_core.py:313, in EngineCore.__init__(self, global_config)
    307     self.pbrpipe = init(
    308         msaa_samples=4,
    309         # use_hardware_skinning=True,
    310         use_330=True
    311     )
    312 else:
--> 313     self.pbrpipe = init(
    314         msaa_samples=16,
    315         use_hardware_skinning=True,
    316         # use_normal_maps=True,
    317         use_330=False
    318     )
    320 self.sky_box = SkyBox(not self.global_config["show_skybox"])
    321 self.sky_box.attach_to_world(self.render, self.physics_world)

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:350, in init(**kwargs)
    320 def init(**kwargs):
    321     '''Initialize the PBR render pipeline
    322     :param render_node: The node to attach the shader too, defaults to `base.render` if `None`
    323     :type render_node: `panda3d.core.NodePath`
   (...)    347     :type use_hardware_skinning: bool or None
    348     '''
--> 350     return Pipeline(**kwargs)

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:142, in Pipeline.__init__(self, render_node, window, camera_node, taskmgr, msaa_samples, max_lights, use_normal_maps, use_emission_maps, exposure, enable_fog, use_occlusion_maps, use_330, use_hardware_skinning)
    139 self._recompile_pbr()
    141 # Tonemapping
--> 142 self._setup_tonemapping()
    144 self._shader_ready = True

File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:287, in Pipeline._setup_tonemapping(self)
    281 post_frag_str = _load_shader_str('tonemap.frag', defines)
    282 tonemap_shader = p3d.Shader.make(
    283     p3d.Shader.SL_GLSL,
    284     vertex=post_vert_str,
    285     fragment=post_frag_str,
    286 )
--> 287 self.tonemap_quad.set_shader(tonemap_shader)
    288 self.tonemap_quad.set_shader_input('tex', scene_tex)
    289 self.tonemap_quad.set_shader_input('exposure', self.exposure)

AttributeError: 'NoneType' object has no attribute 'set_shader'