
Mastering Autonomous Parking with SAC and HER: A Deep Reinforcement Learning Guide
Introduction
Autonomous driving is being transformed by Deep Reinforcement Learning Autonomous Parking systems, where vehicles learn complex maneuvers through trial and error. Among the toughest challenges? Perfecting parking—a task demanding millimeter-level precision and real-time decision making. In this technical guide, we’ll break down how combining Soft Actor-Critic (SAC) with Hindsight Experience Replay (HER) in the highway-env
simulator creates an AI agent capable of professional-grade parking.
We’ll break down the implementation step-by-step, covering:
- Why SAC + HER is effective for sparse reward tasks like parking
- Setting up the environment and training pipeline
- Handling checkpoints and replay buffers for efficient training
- Evaluating and optimizing the model
By the end, you’ll have a complete understanding of how to train an RL agent for autonomous parking using Stable Baselines3.
Why SAC and HER for Autonomous Parking?
1. Soft Actor-Critic (SAC) – The Optimal RL Algorithm
SAC is an off-policy, actor-critic algorithm that:
- Maximizes both reward and entropy (encourages exploration)
- Works well in continuous action spaces (crucial for smooth steering and acceleration)
- Handles high-dimensional observations (like sensor inputs in autonomous driving)
2. Hindsight Experience Replay (HER) – Solving Sparse Rewards
Parking tasks often have sparse rewards—the agent only gets rewarded when it successfully parks. HER helps by:
- Replaying failed episodes with modified goals (e.g., “you didn’t park, but you got closer”)
- Accelerating learning by making better use of past experiences
Let see the Code
Basic installations and import
!pip install stable-baselines3
!pip install highway-env
import os
from pathlib import Path
import glob
import gymnasium as gym
import highway_env
from stable_baselines3 import HerReplayBuffer, SAC
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.logger import configure
import warnings
warnings.filterwarnings('ignore')
Base Directories
# ====================================================
# Base directories
# ====================================================
BASE_DIR = ".../Stable_Baseline/SAC_Highway"
Path(BASE_DIR).mkdir(exist_ok=True)
MODEL_PATH = os.path.join(BASE_DIR, "her_sac_highway")
LOG_DIR = os.path.join(BASE_DIR, "logs")
BEST_MODEL_PATH = os.path.join(BASE_DIR, "best_model")
CHECKPOINT_DIR = os.path.join(LOG_DIR, "checkpoints")
TOTAL_TIMESTEPS = int(1e4)
EVAL_FREQ = 1000
N_EVAL_EPISODES = 5
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(BEST_MODEL_PATH, exist_ok=True)
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
Saving model and buffer periodically
class CheckpointWithBufferCallback(BaseCallback):
def __init__(self, save_freq, save_path, name_prefix="model_checkpoint", keep_last=3, verbose=0):
super().__init__(verbose)
self.save_freq = save_freq
self.save_path = save_path
self.name_prefix = name_prefix
self.keep_last = keep_last
os.makedirs(save_path, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.save_freq == 0:
# Save model + buffer
model_file = os.path.join(self.save_path, f"{self.name_prefix}_{self.n_calls}_steps.zip")
buffer_file = model_file.replace(".zip", "_replay_buffer.pkl")
self.model.save(model_file)
self.model.save_replay_buffer(buffer_file)
if self.verbose > 0:
print(f" Saved checkpoint at {self.n_calls} steps")
# Cleanup old checkpoints
self._cleanup_old_checkpoints()
return True
def _cleanup_old_checkpoints(self):
ckpts = sorted(glob.glob(os.path.join(self.save_path, f"{self.name_prefix}_*_steps.zip")),
key=lambda x: int(x.split("_")[-2]))
if len(ckpts) > self.keep_last:
old_ckpts = ckpts[:len(ckpts) - self.keep_last]
for ckpt in old_ckpts:
buf = ckpt.replace(".zip", "_replay_buffer")
os.remove(ckpt)
if os.path.exists(buf):
os.remove(buf)
if self.verbose > 0:
print(f" Removed old checkpoint: {ckpt}")
Creating gym Environment
env = gym.make("parking-v0") #Create base environment
env = Monitor(env, LOG_DIR) # add logging wrapper
env = DummyVecEnv([lambda: env]) # convert to SB3-compatible format
Configuration for SAC
n_sampled_goal = 4
model_kwargs = {
"policy": "MultiInputPolicy",
"env": env,
"replay_buffer_class": HerReplayBuffer,
"replay_buffer_kwargs": dict(
n_sampled_goal=n_sampled_goal,
goal_selection_strategy="future",
),
"verbose": 1,
"buffer_size": int(1e6),
"learning_rate": 1e-3,
"gamma": 0.95,
"batch_size": 256,
"policy_kwargs": dict(net_arch=[256, 256, 256]),
"tensorboard_log": LOG_DIR,
}
MultiInputPolicy
: Used when observations consist of multiple inputs (e.g., dicts with sensor data + goals). Required for HER.net_arch=[256, 256, 256]
: Defines a neural network with 3 hidden layers (256 units each). Deeper networks can capture complex parking maneuvers.n_sampled_goal=4
: For each transition, HER adds 4 synthetic goals to the buffer (improves sample efficiency).goal_selection_strategy="future"
: Uses future states in the episode as alternate goals (“Even if you didn’t park here, getting closer is useful!”).buffer_size
: Size of the replay buffer (large enough to store diverse experiences).gamma
: How much the agent values future rewards (0.95 balances immediate/long-term rewards).batch_size
: Number of samples used per gradient update.verbose=1
: Prints training progress.tensorboard_log
: Saves training metrics (rewards, losses) for visualization.
Find latest checkpoint if available
def get_latest_checkpoint(checkpoint_dir):
ckpts = glob.glob(os.path.join(checkpoint_dir, "model_checkpoint_*_steps.zip"))
if not ckpts:
return None
# sort by step count (extract integer from filename)
ckpts_sorted = sorted(ckpts, key=lambda x: int(x.split("_")[-2]))
return ckpts_sorted[-1]
latest_ckpt = get_latest_checkpoint(CHECKPOINT_DIR)
Load model + buffer if checkpoint or base model exists
if latest_ckpt is not None:
print(f"🔄 Resuming from latest checkpoint: {latest_ckpt}")
model = SAC.load(latest_ckpt, env=env)
model.set_env(env)
# buffer_path = latest_ckpt.replace(".zip", "_replay_buffer")
buffer_path = latest_ckpt.replace(".zip", "_replay_buffer.pkl")
if os.path.exists(buffer_path):
print("Loading replay buffer from checkpoint...")
model.load_replay_buffer(buffer_path)
else:
print("No replay buffer found for checkpoint, starting with empty buffer.")
elif os.path.exists(MODEL_PATH + ".zip"):
print(f"Loading base model from {MODEL_PATH}")
model = SAC.load(MODEL_PATH, env=env)
model.set_env(env)
buffer_path = MODEL_PATH + "_replay_buffer"
if os.path.exists(buffer_path):
print("🔄 Loading replay buffer...")
model.load_replay_buffer(buffer_path)
else:
print("No replay buffer found for base model, starting with empty buffer.")
else:
print("Creating new model")
model = SAC(**model_kwargs)
Resumes training from latest checkpoint (with replay buffer if available), falls back to base model if no checkpoint exists, otherwise initializes a new SAC+HER model.
Logger
new_logger = configure(LOG_DIR, ["stdout", "csv", "tensorboard"])
model.set_logger(new_logger)
Sets up comprehensive logging to stdout, CSV files, and TensorBoard for real-time monitoring and analysis of training metrics.
The logger will record:
- Training rewards/episode lengths
- Evaluation metrics
- Loss values (policy/value/entropy)
- Custom metrics if added
Example TensorBoard usage
#bash
tensorboard --logdir={LOG_DIR}
Callbacks
eval_callback = EvalCallback(
env,
best_model_save_path=BEST_MODEL_PATH,
log_path=LOG_DIR,
eval_freq=EVAL_FREQ,
n_eval_episodes=N_EVAL_EPISODES,
deterministic=True,
render=False,
verbose=1
)
checkpoint_with_buffer_callback = CheckpointWithBufferCallback(
save_freq=EVAL_FREQ,
save_path=CHECKPOINT_DIR,
name_prefix="model_checkpoint",
keep_last=1,
verbose=1
)
- EvalCallback – “Automatically evaluates model performance during training, saves the best version, and logs metrics to track progress.”
- CheckpointWithBufferCallback – “Periodically saves complete training state (model + replay buffer) while managing disk space by keeping only recent checkpoints.”
Train
print(f" Starting training - outputs in {BASE_DIR}")
model.learn(
total_timesteps=TOTAL_TIMESTEPS,
callback=[eval_callback, checkpoint_with_buffer_callback],
tb_log_name="SAC_HER",
reset_num_timesteps=False, # keep logs continuous
progress_bar=True
)
model.save(MODEL_PATH)
model.save_replay_buffer(MODEL_PATH + "_replay_buffer")
print(f"Training completed. Final model saved to {MODEL_PATH}")
print(f"Replay buffer saved to {MODEL_PATH}_replay_buffer")
print(f"Best model saved to {BEST_MODEL_PATH}")
print(f"Logs and checkpoints saved to {LOG_DIR}")
This code does the training and saves the model.
Evaluate
from stable_baselines3.common.evaluation import evaluate_policy
def get_latest_checkpoint(checkpoint_dir):
ckpts = glob.glob(os.path.join(checkpoint_dir, "model_checkpoint_*_steps.zip"))
if not ckpts:
return None
ckpts_sorted = sorted(ckpts, key=lambda x: int(x.split("_")[-2]))
return ckpts_sorted[-1]
latest_ckpt = get_latest_checkpoint(CHECKPOINT_DIR)
env = gym.make("parking-v0", render_mode="human") # render so you can see
env = Monitor(env, LOG_DIR)
env = DummyVecEnv([lambda: env])
if latest_ckpt is not None:
print(f"Loading latest checkpointed model: {latest_ckpt}")
model = SAC.load(latest_ckpt, env=env)
elif os.path.exists(MODEL_PATH + ".zip"):
print(f" Loading base model: {MODEL_PATH}")
model = SAC.load(MODEL_PATH, env=env)
else:
raise FileNotFoundError(" No trained model or checkpoint found to evaluate.")
mean_reward, std_reward = evaluate_policy(
model,
env,
n_eval_episodes=10,
deterministic=True,
render=True
)
print(f" Evaluation completed over 10 episodes")
print(f" Mean Reward: {mean_reward:.2f} ± {std_reward:.2f}")
The key purpose of this step is to validate the model.
It finds the latest model, sets up evaluation environment, runs the evaluation and prints its result.
The output as a Video
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
import os
import imageio
# LOG_DIR = "./logs"
video_dir = os.path.join(LOG_DIR, "videos")
os.makedirs(video_dir, exist_ok=True)
# Collect frames from 5 episodes into a single list
frames = []
eval_env = gym.make("parking-v0", render_mode="rgb_array")
for episode in range(5):
obs, _ = eval_env.reset()
done, truncated = False, False
while not (done or truncated):
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, truncated, info = eval_env.step(action)
# Capture rendered frame
frame = eval_env.render()
frames.append(frame)
eval_env.close()
# Save all frames as a single video
output_path = os.path.join(video_dir, "five_episodes.mp4")
imageio.mimsave(output_path, frames, fps=10)
print(f"Combined video saved at: {output_path}")
Creates a combined video of 5 evaluation episodes to visualize the agent’s parking performance.