Demonstration on MultigoalIntersection

Click and Open In Colab

In this notebook, we demonstrate how to setup a multigoal intersection environment where you can access relevant stats (e.g. route completion, reward, success rate) for all four possible goals (right turn, left turn, move forward, U turn) simultaneously.

We demonstrate how to build the environment, in which we have successfully trained a SAC expert that achieves 99% success rate, and how to access those stats in the info dict returned each step.

Note: We pretrain the SAC expert with use_multigoal_intersection=False and then finetune it with use_multigoal_intersection=True.

import numpy as np
from metadrive.envs.gym_wrapper import create_gym_wrapper
from metadrive.envs.multigoal_intersection import MultiGoalIntersectionEnv
import mediapy as media

render = False
num_scenarios = 1000
start_seed = 100
goal_probabilities = {
    "right_turn": 0.25,
    "left_turn": 0.25,
    "go_straight": 0.25,
    "u_turn": 0.25
}


class MultiGoalWrapped(MultiGoalIntersectionEnv):
    current_goal = None

    def step(self, actions):
        o, r, tm, tc, i = super().step(actions)

        o = i['obs/goals/{}'.format(self.current_goal)]
        r = i['reward/goals/{}'.format(self.current_goal)]
        i['route_completion'] = i['route_completion/goals/{}'.format(self.current_goal)]
        i['arrive_dest'] = i['arrive_dest/goals/{}'.format(self.current_goal)]
        i['reward/goals/default'] = i['reward/goals/{}'.format(self.current_goal)]
        i['route_completion/goals/default'] = i['route_completion/goals/{}'.format(self.current_goal)]
        i['arrive_dest/goals/default'] = i['arrive_dest/goals/{}'.format(self.current_goal)]
        i["current_goal"] = self.current_goal
        return o, r, tm, tc, i

    def reset(self, *args, **kwargs):
        o, i = super().reset(*args, **kwargs)

        # Sample a goal from the goal set
        if self.config["use_multigoal_intersection"]:
            p = goal_probabilities
            self.current_goal = np.random.choice(list(p.keys()), p=list(p.values()))

        else:
            self.current_goal = "default"

        o = i['obs/goals/{}'.format(self.current_goal)]
        i['route_completion'] = i['route_completion/goals/{}'.format(self.current_goal)]
        i['arrive_dest'] = i['arrive_dest/goals/{}'.format(self.current_goal)]
        i['reward/goals/default'] = i['reward/goals/{}'.format(self.current_goal)]
        i['route_completion/goals/default'] = i['route_completion/goals/{}'.format(self.current_goal)]
        i['arrive_dest/goals/default'] = i['arrive_dest/goals/{}'.format(self.current_goal)]
        i["current_goal"] = self.current_goal

        return o, i
env_config = dict(
    use_render=render,
    manual_control=False,
    vehicle_config=dict(show_lidar=False, show_navi_mark=True, show_line_to_navi_mark=True,
                        show_line_to_dest=True, show_dest_mark=True),
    horizon=500,  # to speed up training

    traffic_density=0.06,
    
    use_multigoal_intersection=True,  # Set to False if want to use the same observation but with original PG scenarios.
    out_of_route_done=False,

    num_scenarios=num_scenarios,
    start_seed=start_seed,
    accident_prob=0.8,
    crash_vehicle_done=False,
    crash_object_done=False,
)

wrapped = create_gym_wrapper(MultiGoalWrapped)

env = wrapped(env_config)
[INFO] Environment: MultiGoalWrapped
[INFO] MetaDrive version: 0.4.2.3
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector()]
[INFO] Render Mode: none
[INFO] Horizon (Max steps per agent): 500
frames = []

env.reset()
while True:
    action = [0, 1]
    o, r, d, i = env.step(action)
    frame = env.render(mode="topdown")
    frames.append(frame)
    if d:
        break
[INFO] Assets version: 0.4.2.3
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Start Scenario Index: 100, Num Scenarios : 1000
[WARNING] env.vehicle will be deprecated soon. Use env.agent instead (base_env.py:731)
[INFO] Episode ended! Scenario Index: 606 Reason: arrive_dest.
print("Output at final step:")

i = {k: i[k] for k in sorted(i.keys())}
for k, v in i.items():
    if isinstance(v, str):
        s = v
    elif np.iterable(v):
        continue
    else:
        s = "{:.3f}".format(v)
    print("\t{}: {}".format(k, s))
Output at final step:
	acceleration: 1.000
	arrive_dest: 1.000
	arrive_dest/goals/default: 1.000
	arrive_dest/goals/go_straight: 1.000
	arrive_dest/goals/left_turn: 0.000
	arrive_dest/goals/right_turn: 0.000
	arrive_dest/goals/u_turn: 0.000
	cost: 0.000
	crash: 0.000
	crash_building: 0.000
	crash_human: 0.000
	crash_object: 0.000
	crash_sidewalk: 0.000
	crash_vehicle: 0.000
	current_goal: go_straight
	env_seed: 606.000
	episode_energy: 6.986
	episode_length: 88.000
	episode_reward: 35.834
	max_step: 0.000
	navigation_command: right
	navigation_forward: 0.000
	navigation_left: 0.000
	navigation_right: 1.000
	out_of_road: 0.000
	overtake_vehicle_num: 0.000
	policy: EnvInputPolicy
	reward/default_reward: -10.000
	reward/goals/default: 12.335
	reward/goals/go_straight: 12.335
	reward/goals/left_turn: -10.000
	reward/goals/right_turn: -10.000
	reward/goals/u_turn: -10.000
	route_completion: 0.969
	route_completion/goals/default: 0.969
	route_completion/goals/go_straight: 0.969
	route_completion/goals/left_turn: 0.632
	route_completion/goals/right_turn: 0.643
	route_completion/goals/u_turn: 0.552
	steering: 0.000
	step_energy: 0.162
	velocity: 22.313
media.show_video(frames)