Multi-sensor Rendering
In our SimGen project, we use MetaDrive’s ScenarioEnv to generate multiple sensor rendering, namely the Semantic Map, Depth Image and RGB Image, to condition the SimGen model.
In this example, we will demonstrate the minimal example on how to achive this in MetaDrive’s side.
The code working with SimGen can be found here.
Utilities
import time
import cv2
import gymnasium as gym
import mediapy as media
import numpy as np
import tqdm
from PIL import Image
from PIL import ImageDraw, ImageFont
from metadrive.component.sensors.depth_camera import DepthCamera
from metadrive.component.sensors.rgb_camera import RGBCamera
from metadrive.component.sensors.semantic_camera import SemanticCamera
from metadrive.engine.asset_loader import AssetLoader
from metadrive.envs.scenario_env import ScenarioEnv
from metadrive.obs.image_obs import ImageObservation
from metadrive.obs.observation_base import BaseObservation
from metadrive.policy.replay_policy import ReplayEgoCarPolicy
def postprocess_semantic_image(image):
"""
In order to align with the Segformer's output, we modify the output color of the semantic image from MetaDrive.
"""
# customized
old_LANE_LINE = (255, 255, 255)
old_CROSSWALK = (55, 176, 189)
new_LANE_LINE = (128, 64, 128)
new_CROSSWALK = (128, 64, 128)
# Change the color of the lane line and crosswalk
assert image.dtype == np.uint8
is_lane_line = (
(image[..., 0] == old_LANE_LINE[0]) &
(image[..., 1] == old_LANE_LINE[1]) &
(image[..., 2] == old_LANE_LINE[2])
)
image[is_lane_line] = new_LANE_LINE
is_crosswalk = (
(image[..., 0] == old_CROSSWALK[0]) &
(image[..., 1] == old_CROSSWALK[1]) &
(image[..., 2] == old_CROSSWALK[2])
)
image[is_crosswalk] = new_CROSSWALK
return image
Customized Observation Class
class SimGenObservation(BaseObservation):
def __init__(self, config):
super(SimGenObservation, self).__init__(config)
assert config["norm_pixel"] is False
assert config["stack_size"] == 1
self.seg_obs = ImageObservation(config, "seg_camera", config["norm_pixel"])
self.rgb_obs = ImageObservation(config, "rgb_camera", config["norm_pixel"])
self.depth_obs = ImageObservation(config, "depth_camera", config["norm_pixel"])
@property
def observation_space(self):
os = dict(
rgb=self.rgb_obs.observation_space,
seg=self.seg_obs.observation_space,
depth=self.depth_obs.observation_space,
)
return gym.spaces.Dict(os)
def observe(self, vehicle):
ret = {}
seg_cam = self.engine.get_sensor("seg_camera").cam
agent = seg_cam.getParent()
original_position = seg_cam.getPos()
heading, pitch, roll = seg_cam.getHpr()
seg_img = self.seg_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
assert seg_img.ndim == 4
assert seg_img.shape[-1] == 1
assert seg_img.dtype == np.uint8
# Do some postprocessing here
seg_img = seg_img[..., 0]
before = seg_img.copy()
seg_img = postprocess_semantic_image(seg_img)
seg_img = seg_img[..., ::-1] # BGR -> RGB
ret["seg"] = seg_img
depth_cam = self.engine.get_sensor("depth_camera").cam
agent = depth_cam.getParent()
original_position = depth_cam.getPos()
heading, pitch, roll = depth_cam.getHpr()
depth_img = self.depth_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
assert depth_img.ndim == 4
assert depth_img.shape[-1] == 1
assert depth_img.dtype == np.uint8
depth_img = depth_img[..., 0]
# before = depth_img.copy()
depth_img = cv2.bitwise_not(depth_img)
depth_img = depth_img[..., None]
ret["depth"] = depth_img
rgb_cam = self.engine.get_sensor("rgb_camera").cam
agent = rgb_cam.getParent()
original_position = rgb_cam.getPos()
heading, pitch, roll = rgb_cam.getHpr()
rgb_img = self.rgb_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])
assert rgb_img.ndim == 4
assert rgb_img.shape[-1] == 1
assert rgb_img.dtype == np.uint8
rgb_img = rgb_img[..., 0]
# Change the color from BGR to RGB
rgb_img = rgb_img[..., ::-1]
ret["rgb"] = rgb_img
return ret
Setup ScenarioEnv
# ===== MetaDrive Setup =====
import os
sensor_size = (16, 16) if os.getenv('TEST_DOC') else (800, 450)
env = ScenarioEnv(
{
'agent_observation': SimGenObservation,
# To enable onscreen rendering, set this config to True.
"use_render": False,
# !!!!! To enable offscreen rendering, set this config to True !!!!!
"image_observation": True,
"norm_pixel": False,
"stack_size": 1,
# ===== The scenario and MetaDrive config =====
"agent_policy": ReplayEgoCarPolicy,
"no_traffic": False,
"sequential_seed": True,
"reactive_traffic": False,
"num_scenarios": 9,
"horizon": 1000,
"no_static_vehicles": False,
"agent_configs": {
"default_agent": dict(use_special_color=True, vehicle_model="varying_dynamics_bounding_box")
},
"vehicle_config": dict(
show_navi_mark=False,
show_line_to_dest=False,
lidar=dict(num_lasers=120, distance=50),
lane_line_detector=dict(num_lasers=0, distance=50),
side_detector=dict(num_lasers=12, distance=50),
),
# "use_bounding_box": True,
"data_directory": AssetLoader.file_path("nuscenes", unix_style=False),
"height_scale": 1,
"set_static": True,
# ===== Set some sensor and visualization configs =====
"daytime": "08:10",
"window_size": (sensor_size[0], sensor_size[1]),
"camera_dist": 0.8, # 0.8, 1.71
"camera_height": 1.5, # 1.5
"camera_pitch": None,
"camera_fov": 66, # 60, 66
"sensors": dict(
depth_camera=(DepthCamera, sensor_size[0], sensor_size[1]),
rgb_camera=(RGBCamera, sensor_size[0], sensor_size[1]),
seg_camera=(SemanticCamera,sensor_size[0], sensor_size[1]),
),
# ===== Remove useless items in the images =====
"show_logo": False,
"show_fps": False,
"show_interface": True,
"disable_collision": True,
"force_destroy": True,
}
)
[INFO] Environment: ScenarioEnv
[INFO] MetaDrive version: 0.4.3
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector(), depth_camera: DepthCamera(800, 450), rgb_camera: RGBCamera(800, 450), seg_camera: SemanticCamera(800, 450)]
[INFO] Render Mode: offscreen
[INFO] Horizon (Max steps per agent): 1000
Rollout
if not os.getenv('TEST_DOC'):
skip_steps = 1
fps = 10
frames = []
env.reset()
scenario = env.engine.data_manager.current_scenario
scenario_id = scenario['id']
print(
"Current scenario ID {}, dataset version {}, len: {}".format(
scenario_id, scenario['version'], scenario['length']
)
)
horizon = scenario['length']
for t in tqdm.trange(horizon):
o, r, d, _, _ = env.step([1, 0.88])
if t % skip_steps == 0:
depth_img = Image.fromarray(o["depth"].repeat(3, axis=-1), mode="RGB")
seg_img = Image.fromarray(o["seg"], mode="RGB")
rgb_img = Image.fromarray(o["rgb"], mode="RGB")
vis = cv2.hconcat([o["seg"], o["depth"].repeat(3, axis=-1)])
h, w, _ = o["rgb"].shape
vis_w = vis.shape[1]
image = cv2.resize(o["rgb"], (vis_w, int(h * vis_w / w)))
vis = cv2.vconcat([vis, image])
# Quick visualization:
# import matplotlib.pyplot as plt;plt.imshow(vis);plt.show()
frames.append(vis)
env.close()
media.show_video(frames, fps=fps, width=600)
[INFO] Assets version: 0.4.3
:device(error): Error adding inotify watch on /dev/input: No such file or directory
:device(error): Error opening directory /dev/input: No such file or directory
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Assets version: 0.4.3
[INFO] Known Pipes: glxGraphicsPipe
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[4], line 7
3 fps = 10
5 frames = []
----> 7 env.reset()
8 scenario = env.engine.data_manager.current_scenario
9 scenario_id = scenario['id']
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:523, in BaseEnv.reset(self, seed)
521 log_level = self.config.get("log_level", logging.DEBUG if self.config.get("debug", False) else logging.INFO)
522 set_log_level(log_level)
--> 523 self.lazy_init() # it only works the first time when reset() is called to avoid the error when render
524 self._reset_global_seed(seed)
525 if self.engine is None:
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/envs/base_env.py:415, in BaseEnv.lazy_init(self)
413 if engine_initialized():
414 return
--> 415 initialize_engine(self.config)
416 # engine setup
417 self.setup_engine()
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/engine_utils.py:38, in initialize_engine(env_global_config)
35 cls = BaseEngine
36 if cls.singleton is None:
37 # assert cls.global_config is not None, "Set global config before initialization BaseEngine"
---> 38 cls.singleton = cls(env_global_config)
39 else:
40 raise PermissionError("There should be only one BaseEngine instance in one process")
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/base_engine.py:58, in BaseEngine.__init__(self, global_config)
56 self.id_c = dict()
57 self.try_pull_asset()
---> 58 EngineCore.__init__(self, global_config)
59 Randomizable.__init__(self, self.global_random_seed)
60 self.episode_step = 0
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/engine/core/engine_core.py:313, in EngineCore.__init__(self, global_config)
307 self.pbrpipe = init(
308 msaa_samples=4,
309 # use_hardware_skinning=True,
310 use_330=True
311 )
312 else:
--> 313 self.pbrpipe = init(
314 msaa_samples=16,
315 use_hardware_skinning=True,
316 # use_normal_maps=True,
317 use_330=False
318 )
320 self.sky_box = SkyBox(not self.global_config["show_skybox"])
321 self.sky_box.attach_to_world(self.render, self.physics_world)
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:350, in init(**kwargs)
320 def init(**kwargs):
321 '''Initialize the PBR render pipeline
322 :param render_node: The node to attach the shader too, defaults to `base.render` if `None`
323 :type render_node: `panda3d.core.NodePath`
(...) 347 :type use_hardware_skinning: bool or None
348 '''
--> 350 return Pipeline(**kwargs)
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:142, in Pipeline.__init__(self, render_node, window, camera_node, taskmgr, msaa_samples, max_lights, use_normal_maps, use_emission_maps, exposure, enable_fog, use_occlusion_maps, use_330, use_hardware_skinning)
139 self._recompile_pbr()
141 # Tonemapping
--> 142 self._setup_tonemapping()
144 self._shader_ready = True
File ~/checkouts/readthedocs.org/user_builds/metadrive-simulator/envs/latest/lib/python3.11/site-packages/metadrive/third_party/simplepbr/__init__.py:287, in Pipeline._setup_tonemapping(self)
281 post_frag_str = _load_shader_str('tonemap.frag', defines)
282 tonemap_shader = p3d.Shader.make(
283 p3d.Shader.SL_GLSL,
284 vertex=post_vert_str,
285 fragment=post_frag_str,
286 )
--> 287 self.tonemap_quad.set_shader(tonemap_shader)
288 self.tonemap_quad.set_shader_input('tex', scene_tex)
289 self.tonemap_quad.set_shader_input('exposure', self.exposure)
AttributeError: 'NoneType' object has no attribute 'set_shader'