Python
Mastering Autonomous Parking with SAC and HER: A Deep Reinforcement Learning Guide

Mastering Autonomous Parking with SAC and HER: A Deep Reinforcement Learning Guide

Introduction

Autonomous driving is being transformed by Deep Reinforcement Learning Autonomous Parking systems, where vehicles learn complex maneuvers through trial and error. Among the toughest challenges? Perfecting parking—a task demanding millimeter-level precision and real-time decision making. In this technical guide, we’ll break down how combining Soft Actor-Critic (SAC) with Hindsight Experience Replay (HER) in the highway-env simulator creates an AI agent capable of professional-grade parking.


We’ll break down the implementation step-by-step, covering:

  • Why SAC + HER is effective for sparse reward tasks like parking
  • Setting up the environment and training pipeline
  • Handling checkpoints and replay buffers for efficient training
  • Evaluating and optimizing the model

By the end, you’ll have a complete understanding of how to train an RL agent for autonomous parking using Stable Baselines3.

Why SAC and HER for Autonomous Parking?

1. Soft Actor-Critic (SAC) – The Optimal RL Algorithm

SAC is an off-policy, actor-critic algorithm that:

  • Maximizes both reward and entropy (encourages exploration)
  • Works well in continuous action spaces (crucial for smooth steering and acceleration)
  • Handles high-dimensional observations (like sensor inputs in autonomous driving)

2. Hindsight Experience Replay (HER) – Solving Sparse Rewards

Parking tasks often have sparse rewards—the agent only gets rewarded when it successfully parks. HER helps by:

  • Replaying failed episodes with modified goals (e.g., “you didn’t park, but you got closer”)
  • Accelerating learning by making better use of past experiences

Let see the Code

Basic installations and import

!pip install stable-baselines3
!pip install highway-env
import os
from pathlib import Path
import glob
import gymnasium as gym
import highway_env
from stable_baselines3 import HerReplayBuffer, SAC
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.logger import configure
import warnings

warnings.filterwarnings('ignore')

Base Directories

# ====================================================
# Base directories
# ====================================================
BASE_DIR = ".../Stable_Baseline/SAC_Highway"
Path(BASE_DIR).mkdir(exist_ok=True)

MODEL_PATH = os.path.join(BASE_DIR, "her_sac_highway")
LOG_DIR = os.path.join(BASE_DIR, "logs")
BEST_MODEL_PATH = os.path.join(BASE_DIR, "best_model")
CHECKPOINT_DIR = os.path.join(LOG_DIR, "checkpoints")

TOTAL_TIMESTEPS = int(1e4)
EVAL_FREQ = 1000
N_EVAL_EPISODES = 5

os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(BEST_MODEL_PATH, exist_ok=True)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

Saving model and buffer periodically

class CheckpointWithBufferCallback(BaseCallback):
    def __init__(self, save_freq, save_path, name_prefix="model_checkpoint", keep_last=3, verbose=0):
        super().__init__(verbose)
        self.save_freq = save_freq
        self.save_path = save_path
        self.name_prefix = name_prefix
        self.keep_last = keep_last
        os.makedirs(save_path, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.save_freq == 0:
            # Save model + buffer
            model_file = os.path.join(self.save_path, f"{self.name_prefix}_{self.n_calls}_steps.zip")
            buffer_file = model_file.replace(".zip", "_replay_buffer.pkl")

            self.model.save(model_file)
            self.model.save_replay_buffer(buffer_file)
            if self.verbose > 0:
                print(f" Saved checkpoint at {self.n_calls} steps")

            # Cleanup old checkpoints
            self._cleanup_old_checkpoints()

        return True

    def _cleanup_old_checkpoints(self):
        ckpts = sorted(glob.glob(os.path.join(self.save_path, f"{self.name_prefix}_*_steps.zip")),
                       key=lambda x: int(x.split("_")[-2]))
        if len(ckpts) > self.keep_last:
            old_ckpts = ckpts[:len(ckpts) - self.keep_last]
            for ckpt in old_ckpts:
                buf = ckpt.replace(".zip", "_replay_buffer")
                os.remove(ckpt)
                if os.path.exists(buf):
                    os.remove(buf)
                if self.verbose > 0:
                    print(f" Removed old checkpoint: {ckpt}")

Creating gym Environment

env = gym.make("parking-v0")  #Create base environment
env = Monitor(env, LOG_DIR)   # add logging wrapper
env = DummyVecEnv([lambda: env])   # convert to SB3-compatible format

Configuration for SAC

n_sampled_goal = 4
model_kwargs = {
    "policy": "MultiInputPolicy",
    "env": env,
    "replay_buffer_class": HerReplayBuffer,
    "replay_buffer_kwargs": dict(
        n_sampled_goal=n_sampled_goal,
        goal_selection_strategy="future",
    ),
    "verbose": 1,
    "buffer_size": int(1e6),
    "learning_rate": 1e-3,
    "gamma": 0.95,
    "batch_size": 256,
    "policy_kwargs": dict(net_arch=[256, 256, 256]),
    "tensorboard_log": LOG_DIR,
}
  • MultiInputPolicy: Used when observations consist of multiple inputs (e.g., dicts with sensor data + goals). Required for HER.
  • net_arch=[256, 256, 256]: Defines a neural network with 3 hidden layers (256 units each). Deeper networks can capture complex parking maneuvers.
  • n_sampled_goal=4: For each transition, HER adds 4 synthetic goals to the buffer (improves sample efficiency).
  • goal_selection_strategy="future": Uses future states in the episode as alternate goals (“Even if you didn’t park here, getting closer is useful!”).
  • buffer_size: Size of the replay buffer (large enough to store diverse experiences).
  • gamma: How much the agent values future rewards (0.95 balances immediate/long-term rewards).
  • batch_size: Number of samples used per gradient update.
  • verbose=1: Prints training progress.
  • tensorboard_log: Saves training metrics (rewards, losses) for visualization.

Find latest checkpoint if available

def get_latest_checkpoint(checkpoint_dir):
    ckpts = glob.glob(os.path.join(checkpoint_dir, "model_checkpoint_*_steps.zip"))
    if not ckpts:
        return None
    # sort by step count (extract integer from filename)
    ckpts_sorted = sorted(ckpts, key=lambda x: int(x.split("_")[-2]))
    return ckpts_sorted[-1]

latest_ckpt = get_latest_checkpoint(CHECKPOINT_DIR)

Load model + buffer if checkpoint or base model exists

if latest_ckpt is not None:
    print(f"🔄 Resuming from latest checkpoint: {latest_ckpt}")
    model = SAC.load(latest_ckpt, env=env)
    model.set_env(env)

    # buffer_path = latest_ckpt.replace(".zip", "_replay_buffer")
    buffer_path = latest_ckpt.replace(".zip", "_replay_buffer.pkl")
    if os.path.exists(buffer_path):
        print("Loading replay buffer from checkpoint...")
        model.load_replay_buffer(buffer_path)
    else:
        print("No replay buffer found for checkpoint, starting with empty buffer.")

elif os.path.exists(MODEL_PATH + ".zip"):
    print(f"Loading base model from {MODEL_PATH}")
    model = SAC.load(MODEL_PATH, env=env)
    model.set_env(env)

    buffer_path = MODEL_PATH + "_replay_buffer"
    if os.path.exists(buffer_path):
        print("🔄 Loading replay buffer...")
        model.load_replay_buffer(buffer_path)
    else:
        print("No replay buffer found for base model, starting with empty buffer.")

else:
    print("Creating new model")
    model = SAC(**model_kwargs)

Resumes training from latest checkpoint (with replay buffer if available), falls back to base model if no checkpoint exists, otherwise initializes a new SAC+HER model.

Logger

new_logger = configure(LOG_DIR, ["stdout", "csv", "tensorboard"])
model.set_logger(new_logger)

Sets up comprehensive logging to stdout, CSV files, and TensorBoard for real-time monitoring and analysis of training metrics.

The logger will record:

  • Training rewards/episode lengths
  • Evaluation metrics
  • Loss values (policy/value/entropy)
  • Custom metrics if added

Example TensorBoard usage

#bash
tensorboard --logdir={LOG_DIR}

Callbacks


eval_callback = EvalCallback(
    env,
    best_model_save_path=BEST_MODEL_PATH,
    log_path=LOG_DIR,
    eval_freq=EVAL_FREQ,
    n_eval_episodes=N_EVAL_EPISODES,
    deterministic=True,
    render=False,
    verbose=1
)

checkpoint_with_buffer_callback = CheckpointWithBufferCallback(
    save_freq=EVAL_FREQ,
    save_path=CHECKPOINT_DIR,
    name_prefix="model_checkpoint",
    keep_last=1,
    verbose=1
)
  • EvalCallback – “Automatically evaluates model performance during training, saves the best version, and logs metrics to track progress.”
  • CheckpointWithBufferCallback – “Periodically saves complete training state (model + replay buffer) while managing disk space by keeping only recent checkpoints.”

Train

print(f" Starting training - outputs in {BASE_DIR}")
model.learn(
    total_timesteps=TOTAL_TIMESTEPS,
    callback=[eval_callback, checkpoint_with_buffer_callback],
    tb_log_name="SAC_HER",
    reset_num_timesteps=False,   # keep logs continuous
    progress_bar=True
)


model.save(MODEL_PATH)
model.save_replay_buffer(MODEL_PATH + "_replay_buffer")

print(f"Training completed. Final model saved to {MODEL_PATH}")
print(f"Replay buffer saved to {MODEL_PATH}_replay_buffer")
print(f"Best model saved to {BEST_MODEL_PATH}")
print(f"Logs and checkpoints saved to {LOG_DIR}")

This code does the training and saves the model.

Evaluate

from stable_baselines3.common.evaluation import evaluate_policy

def get_latest_checkpoint(checkpoint_dir):
    ckpts = glob.glob(os.path.join(checkpoint_dir, "model_checkpoint_*_steps.zip"))
    if not ckpts:
        return None
    ckpts_sorted = sorted(ckpts, key=lambda x: int(x.split("_")[-2]))
    return ckpts_sorted[-1]

latest_ckpt = get_latest_checkpoint(CHECKPOINT_DIR)

env = gym.make("parking-v0", render_mode="human")  # render so you can see
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])

if latest_ckpt is not None:
    print(f"Loading latest checkpointed model: {latest_ckpt}")
    model = SAC.load(latest_ckpt, env=env)
elif os.path.exists(MODEL_PATH + ".zip"):
    print(f" Loading base model: {MODEL_PATH}")
    model = SAC.load(MODEL_PATH, env=env)
else:
    raise FileNotFoundError(" No trained model or checkpoint found to evaluate.")

mean_reward, std_reward = evaluate_policy(
    model,
    env,
    n_eval_episodes=10,
    deterministic=True,
    render=True
)

print(f" Evaluation completed over 10 episodes")
print(f"   Mean Reward: {mean_reward:.2f} ± {std_reward:.2f}")

The key purpose of this step is to validate the model.

It finds the latest model, sets up evaluation environment, runs the evaluation and prints its result.

The output as a Video

import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import os
import imageio

# LOG_DIR = "./logs"
video_dir = os.path.join(LOG_DIR, "videos")
os.makedirs(video_dir, exist_ok=True)

# Collect frames from 5 episodes into a single list
frames = []

eval_env = gym.make("parking-v0", render_mode="rgb_array")

for episode in range(5):
    obs, _ = eval_env.reset()
    done, truncated = False, False

    while not (done or truncated):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, done, truncated, info = eval_env.step(action)

        # Capture rendered frame
        frame = eval_env.render()
        frames.append(frame)

eval_env.close()

# Save all frames as a single video
output_path = os.path.join(video_dir, "five_episodes.mp4")
imageio.mimsave(output_path, frames, fps=10)

print(f"Combined video saved at: {output_path}")

Creates a combined video of 5 evaluation episodes to visualize the agent’s parking performance.

Output Video

Code on Github

Leave a Reply

Your email address will not be published. Required fields are marked *