{ "cells": [ { "cell_type": "markdown", "id": "8608d8b2", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "# Multi-sensor Rendering\n", "\n", "[![Click and Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/metadriverse/metadrive/blob/main/documentation/source/simgen_render.ipynb)\n", "\n", "\n", "In our [SimGen project](https://metadriverse.github.io/simgen/), we use MetaDrive's ScenarioEnv to generate multiple sensor rendering, namely the Semantic Map, Depth Image and RGB Image, to condition the SimGen model.\n", "\n", "In this example, we will demonstrate the minimal example on how to achive this in MetaDrive's side.\n", "\n", "The code working with SimGen can be found [here](https://github.com/metadriverse/SimGen/blob/0daf05b7f0a8d2a582cc4e054720b60f328fea50/metadrive_simgen.py).\n" ] }, { "cell_type": "markdown", "id": "95c31ea0-fff0-4df8-a61e-9c0b40b40a9d", "metadata": {}, "source": [ "### Utilities" ] }, { "cell_type": "code", "execution_count": null, "id": "b0c054bf-510e-4689-8c7c-b102a374f6bd", "metadata": { "tags": [] }, "outputs": [], "source": [ "import time\n", "\n", "import cv2\n", "import gymnasium as gym\n", "import mediapy as media\n", "import numpy as np\n", "import tqdm\n", "from PIL import Image\n", "from PIL import ImageDraw, ImageFont\n", "from metadrive.component.sensors.depth_camera import DepthCamera\n", "from metadrive.component.sensors.rgb_camera import RGBCamera\n", "from metadrive.component.sensors.semantic_camera import SemanticCamera\n", "from metadrive.engine.asset_loader import AssetLoader\n", "from metadrive.envs.scenario_env import ScenarioEnv\n", "from metadrive.obs.image_obs import ImageObservation\n", "from metadrive.obs.observation_base import BaseObservation\n", "from metadrive.policy.replay_policy import ReplayEgoCarPolicy\n", "\n", "\n", "def postprocess_semantic_image(image):\n", " \"\"\"\n", " In order to align with the Segformer's output, we modify the output color of the semantic image from MetaDrive.\n", " \"\"\"\n", " # customized\n", " old_LANE_LINE = (255, 255, 255)\n", " old_CROSSWALK = (55, 176, 189)\n", "\n", " new_LANE_LINE = (128, 64, 128)\n", " new_CROSSWALK = (128, 64, 128)\n", "\n", " # Change the color of the lane line and crosswalk\n", " assert image.dtype == np.uint8\n", "\n", " is_lane_line = (\n", " (image[..., 0] == old_LANE_LINE[0]) &\n", " (image[..., 1] == old_LANE_LINE[1]) &\n", " (image[..., 2] == old_LANE_LINE[2])\n", " )\n", " image[is_lane_line] = new_LANE_LINE\n", "\n", " is_crosswalk = (\n", " (image[..., 0] == old_CROSSWALK[0]) &\n", " (image[..., 1] == old_CROSSWALK[1]) &\n", " (image[..., 2] == old_CROSSWALK[2])\n", " )\n", " image[is_crosswalk] = new_CROSSWALK\n", "\n", " return image\n" ] }, { "cell_type": "markdown", "id": "02f1d8de-9338-4aae-a901-4f5f864b3fee", "metadata": {}, "source": [ "### Customized Observation Class" ] }, { "cell_type": "code", "execution_count": null, "id": "1ca03090-cf92-47e8-9476-e331c7eef62c", "metadata": {}, "outputs": [], "source": [ "class SimGenObservation(BaseObservation):\n", " def __init__(self, config):\n", " super(SimGenObservation, self).__init__(config)\n", " assert config[\"norm_pixel\"] is False\n", " assert config[\"stack_size\"] == 1\n", " self.seg_obs = ImageObservation(config, \"seg_camera\", config[\"norm_pixel\"])\n", " self.rgb_obs = ImageObservation(config, \"rgb_camera\", config[\"norm_pixel\"])\n", " self.depth_obs = ImageObservation(config, \"depth_camera\", config[\"norm_pixel\"])\n", "\n", " @property\n", " def observation_space(self):\n", " os = dict(\n", " rgb=self.rgb_obs.observation_space,\n", " seg=self.seg_obs.observation_space,\n", " depth=self.depth_obs.observation_space,\n", " )\n", " return gym.spaces.Dict(os)\n", "\n", " def observe(self, vehicle):\n", " ret = {}\n", "\n", " seg_cam = self.engine.get_sensor(\"seg_camera\").cam\n", " agent = seg_cam.getParent()\n", " original_position = seg_cam.getPos()\n", " heading, pitch, roll = seg_cam.getHpr()\n", " seg_img = self.seg_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])\n", " assert seg_img.ndim == 4\n", " assert seg_img.shape[-1] == 1\n", " assert seg_img.dtype == np.uint8\n", " # Do some postprocessing here\n", " seg_img = seg_img[..., 0]\n", " before = seg_img.copy()\n", " seg_img = postprocess_semantic_image(seg_img)\n", " seg_img = seg_img[..., ::-1] # BGR -> RGB\n", " ret[\"seg\"] = seg_img\n", "\n", " depth_cam = self.engine.get_sensor(\"depth_camera\").cam\n", " agent = depth_cam.getParent()\n", " original_position = depth_cam.getPos()\n", " heading, pitch, roll = depth_cam.getHpr()\n", " depth_img = self.depth_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])\n", " assert depth_img.ndim == 4\n", " assert depth_img.shape[-1] == 1\n", " assert depth_img.dtype == np.uint8\n", " depth_img = depth_img[..., 0]\n", " # before = depth_img.copy()\n", " depth_img = cv2.bitwise_not(depth_img)\n", " depth_img = depth_img[..., None]\n", " ret[\"depth\"] = depth_img\n", "\n", " rgb_cam = self.engine.get_sensor(\"rgb_camera\").cam\n", " agent = rgb_cam.getParent()\n", " original_position = rgb_cam.getPos()\n", " heading, pitch, roll = rgb_cam.getHpr()\n", " rgb_img = self.rgb_obs.observe(agent, position=original_position, hpr=[heading, pitch, roll])\n", " assert rgb_img.ndim == 4\n", " assert rgb_img.shape[-1] == 1\n", " assert rgb_img.dtype == np.uint8\n", " rgb_img = rgb_img[..., 0]\n", " # Change the color from BGR to RGB\n", " rgb_img = rgb_img[..., ::-1]\n", " ret[\"rgb\"] = rgb_img\n", "\n", " return ret\n" ] }, { "cell_type": "markdown", "id": "7aecaa0d-466f-4492-8910-f6eef2c71e1d", "metadata": {}, "source": [ "### Setup ScenarioEnv" ] }, { "cell_type": "code", "execution_count": null, "id": "294b0ed1-a7d8-44f9-b20e-2b5671adeec6", "metadata": { "tags": [] }, "outputs": [], "source": [ "# ===== MetaDrive Setup =====\n", "import os\n", "sensor_size = (16, 16) if os.getenv('TEST_DOC') else (800, 450)\n", "\n", "env = ScenarioEnv(\n", " {\n", " 'agent_observation': SimGenObservation,\n", "\n", " # To enable onscreen rendering, set this config to True.\n", " \"use_render\": False,\n", "\n", " # !!!!! To enable offscreen rendering, set this config to True !!!!!\n", " \"image_observation\": True,\n", "\n", " \"norm_pixel\": False,\n", " \"stack_size\": 1,\n", "\n", " # ===== The scenario and MetaDrive config =====\n", " \"agent_policy\": ReplayEgoCarPolicy,\n", " \"no_traffic\": False,\n", " \"sequential_seed\": True,\n", " \"reactive_traffic\": False,\n", " \"num_scenarios\": 9,\n", " \"horizon\": 1000,\n", " \"no_static_vehicles\": False,\n", " \"agent_configs\": {\n", " \"default_agent\": dict(use_special_color=True, vehicle_model=\"varying_dynamics_bounding_box\")\n", " },\n", " \"vehicle_config\": dict(\n", " show_navi_mark=False,\n", " show_line_to_dest=False,\n", " lidar=dict(num_lasers=120, distance=50),\n", " lane_line_detector=dict(num_lasers=0, distance=50),\n", " side_detector=dict(num_lasers=12, distance=50),\n", " ),\n", " # \"use_bounding_box\": True,\n", " \"data_directory\": AssetLoader.file_path(\"nuscenes\", unix_style=False),\n", " \"height_scale\": 1,\n", "\n", " \"set_static\": True,\n", "\n", " # ===== Set some sensor and visualization configs =====\n", " \"daytime\": \"08:10\",\n", " \"window_size\": (sensor_size[0], sensor_size[1]),\n", " \"camera_dist\": 0.8, # 0.8, 1.71\n", " \"camera_height\": 1.5, # 1.5\n", " \"camera_pitch\": None,\n", " \"camera_fov\": 66, # 60, 66\n", " \"sensors\": dict(\n", " depth_camera=(DepthCamera, sensor_size[0], sensor_size[1]),\n", " rgb_camera=(RGBCamera, sensor_size[0], sensor_size[1]),\n", " seg_camera=(SemanticCamera,sensor_size[0], sensor_size[1]),\n", " ),\n", "\n", " # ===== Remove useless items in the images =====\n", " \"show_logo\": False,\n", " \"show_fps\": False,\n", " \"show_interface\": True,\n", " \"disable_collision\": True,\n", " \"force_destroy\": True,\n", " }\n", ")" ] }, { "cell_type": "markdown", "id": "1964a9f4-1583-4189-b072-efc83bc0adf0", "metadata": {}, "source": [ "### Rollout" ] }, { "cell_type": "code", "execution_count": null, "id": "c394d21c0f053a32", "metadata": {}, "outputs": [], "source": [ "if not os.getenv('TEST_DOC'):\n", " skip_steps = 1\n", " fps = 10\n", " \n", " frames = []\n", " \n", " env.reset()\n", " scenario = env.engine.data_manager.current_scenario\n", " scenario_id = scenario['id']\n", " print(\n", " \"Current scenario ID {}, dataset version {}, len: {}\".format(\n", " scenario_id, scenario['version'], scenario['length']\n", " )\n", " )\n", " horizon = scenario['length']\n", " for t in tqdm.trange(horizon):\n", " o, r, d, _, _ = env.step([1, 0.88])\n", " if t % skip_steps == 0:\n", " depth_img = Image.fromarray(o[\"depth\"].repeat(3, axis=-1), mode=\"RGB\")\n", " seg_img = Image.fromarray(o[\"seg\"], mode=\"RGB\")\n", " rgb_img = Image.fromarray(o[\"rgb\"], mode=\"RGB\")\n", " \n", " vis = cv2.hconcat([o[\"seg\"], o[\"depth\"].repeat(3, axis=-1)])\n", " h, w, _ = o[\"rgb\"].shape\n", " vis_w = vis.shape[1]\n", " image = cv2.resize(o[\"rgb\"], (vis_w, int(h * vis_w / w)))\n", " vis = cv2.vconcat([vis, image])\n", " \n", " # Quick visualization:\n", " # import matplotlib.pyplot as plt;plt.imshow(vis);plt.show()\n", " frames.append(vis)\n", " env.close()\n", " \n", " media.show_video(frames, fps=fps, width=600)" ] }, { "cell_type": "code", "execution_count": null, "id": "080d5870-0abd-4826-95bf-5a6ed373bf9b", "metadata": { "tags": [ "skip-execution" ] }, "outputs": [], "source": "" } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.20" }, "mystnb": { "execution_mode": "auto" } }, "nbformat": 4, "nbformat_minor": 5 }