Python
Teaching Robots to Slide Objects: TQC Reinforcement Learning on FetchSlideDense-v4

Teaching Robots to Slide Objects: TQC Reinforcement Learning on FetchSlideDense-v4

Introduction

In this step-by-step tutorial, you will learn how to train a Truncated Quantile Critics (TQC) agent on the FetchSlideDense-v4 environment using Stable-Baselines3 and Gymnasium Robotics. We’ll start by setting up the environment and importing the necessary libraries, then move on to training the agent while monitoring progress with TensorBoard. Along the way, you’ll see how to record evaluation metrics, save training videos of the robot’s performance, and store the trained model for later use. By the end of this tutorial, you’ll have a complete workflow—from environment setup to visualization and video recording—giving you practical experience in applying distributional reinforcement learning to a challenging robotics task..

Video

The following video demonstrates what we are going to achieve.

Introduction to Fetch Environments

The Fetch environments, part of the Gymnasium-Robotics suite developed by the Farama Foundation, center around a 7-DoF Fetch Mobile Manipulator outfitted with a parallel two-fingered gripper. They serve as standard benchmarks in multi-goal reinforcement learning, featuring tasks like reaching (FetchReach), pushing (FetchPush), sliding a puck (FetchSlide), and pick-and-place operations (FetchPickAndPlace).

Among these, FetchSlide presents a uniquely challenging scenario: the robot arm must strike a puck placed on a long, slippery table, causing it to slide toward and come to rest at a specified target location that’s often outside the robot’s direct reach Farama Robotics+1. In its dense-reward variant—commonly known as FetchSlideDense—the agent receives continuous feedback proportional to the negative Euclidean distance between the puck’s current (“achieved_goal”) and desired (“desired_goal”) positions, offering richer learning signals compared to sparse reward setups.

Introduction to TQC

Truncated Quantile Critics (TQC) is a novel approach designed to mitigate the common problem of overestimation bias in off-policy reinforcement learning—especially in continuous control domains. TQC achieves this by combining three key components: (1) a distributional critic that models the full return distribution rather than only its expectation, (2) truncation, which discards a portion of the highest quantile predictions to reduce undue optimism, and (3) ensembling multiple critic networks to further stabilize value estimates. These innovations allow granular control over overestimation and improve robustness.

Table of Contents

Installation

!pip install stable-baselines3[extra]

Installs Stable-Baselines3 with extra dependencies for training, evaluation, and logging reinforcement learning agents.

!pip install sb3-contrib

Installs the contrib module that includes advanced algorithms like TQC, which are not in the core SB3 package.

!pip install gymnasium-robotics

Installs robotics environments (e.g., Fetch, ShadowHand) from Gymnasium, needed to train agents in tasks like FetchSlide.

!apt-get update && apt-get install -y xvfb ffmpeg

Updates package lists and installs Xvfb (a virtual display server for rendering environments without a physical screen) and FFmpeg (for recording and saving videos of the agent’s training).

!pip install pyvirtualdisplay

Installs PyVirtualDisplay, which lets you create a virtual screen (via Xvfb) in Colab or headless servers so that environments with rendering (like robotics tasks) can run and record videos without a physical display.

Imports

# Standard library
import glob          
import os            
from pathlib import Path  
  • glob : For file pattern matching (e.g., find files by wildcard)
  • os : For interacting with the operating system (paths, env vars, file ops)
  • pathlib: For object-oriented filesystem path handling
# Third-party
import gymnasium as gym        # Provides RL environments and APIs for training/evaluating agents
import gymnasium_robotics      # Extends Gymnasium with robotics simulation environments
  • gymnasium: Provides RL environments and APIs for training/evaluating agents
  • gymnasium_robotics: Extends Gymnasium with robotics simulation environments
import torch                               
from sb3_contrib import TQC                 
from stable_baselines3 import HerReplayBuffer       
from stable_baselines3.common.callbacks import EvalCallback   
from stable_baselines3.common.evaluation import evaluate_policy  
from stable_baselines3.common.monitor import Monitor           
from stable_baselines3.common.vec_env import DummyVecEnv       
from stable_baselines3.common.vec_env import VecVideoRecorder  
  • torch: Pytorch deep learining library for building and training neural networks
  • sb3_contrib : Truncated Quantile Critics from Stable-Baselines3 contrib
  • HerReplayBuffer: For goal-based RL tasks
  • EvalCallback: Callback fro periodic evaluation during training
  • evaluate_policy: Utility to evaluate a trained RL Policy
  • Monitor: Wrapper to record rewards, episodes and logs
  • DummyVecEnv: Vectorized environment wrapper for single/multiple envs
  • VevVideoRecorder: To record agent rollouts as videos
device = "cuda" if torch.cuda.is_available() else "cpu"   
print(f"Using device: {device}")                          

Select GPU (CUDA) if available, otherwise fall back to CPU and displays which device is being used for training.

Environment

env_id = "FetchSlideDense-v4"
def make_env():
    return Monitor(gym.make(env_id))
env = DummyVecEnv([make_env for _ in range(4)])
eval_env = Monitor(gym.make(env_id))
  • env_id: Define the environment ID (robotic sliding task with dense rewards)
  • make_env : Function to create a monitored environment instance
  • eval_env : Separate Monitored environment used only for the evaluation purpose.

Why separate environment for evaluation?

During training, the agent explores, the training env is used for both learning and logging, so rewards there are not measure of true performance.

To measure how well the agent is actually doing, we need a separate evaluation environment, where:

  • No exploration noise is added.
  • We can run multiple test episodes and get clean stats (reward, length, success rate etc.)

Monitor and DummyVecEnv

  • The monitor wrapper is used around Gym environments to track episode statistics during training like: episode length, episode rewards, success (if environment provides it).
  • SB3 algorithms require vectorized environments (environments that return batched observations and rewards).
  • DummyVecEnv is a lightweight wrapper that takes a list of functions returning environments and runs them sequentially in the same process.
  • This is useful for environment that can’t run in parallel or when we don’t need the overhead of multiprocessing.
  • env = DummyVecEnv([make_env for _ in range(4)]): Creates a vectorized environment with 4 parallel instances and every env created will be wrapped with Monitor, so training logs will include mean rewards, episode length etc.
  • The agent will interact with all 4 at once (in parallel batches) to improve stability and sample efficiency (collects more experience per step).

Directories

BASE_DIR = ".../RoboSlide/tqc_roboslide_10M"
Path(BASE_DIR).mkdir(parents=True, exist_ok=True)

# Subdirectories
LOG_DIR = os.path.join(BASE_DIR, "logs")
MODEL_DIR = os.path.join(BASE_DIR, "models")

Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)

# File paths
MODEL_PATH = os.path.join(MODEL_DIR, "robot_slide_sac.zip")
HER_PATH = os.path.join(MODEL_DIR, "her_robot_slide_sac.pkl")
EVAL_LOG_DIR = os.path.join(LOG_DIR, "eval")

Path(EVAL_LOG_DIR).mkdir(parents=True, exist_ok=True)

BEST_MODEL_PATH = os.path.join(MODEL_DIR, "best_model.zip")
BEST_HER_PATH = os.path.join(MODEL_DIR, "best_model_replay_buffer.pkl")

This block sets up the directory structure and file paths for training and saving models. First, a base directory (BASE_DIR) is created to store all outputs, with subdirectories for logs (LOG_DIR) and models (MODEL_DIR), each ensured to exist using Path(...).mkdir(parents=True, exist_ok=True). Within these directories, specific file paths are defined: MODEL_PATH for saving the trained SAC model, HER_PATH for saving the HER replay buffer, and EVAL_LOG_DIR for storing evaluation logs. Additionally, separate paths are reserved for the best-performing model (BEST_MODEL_PATH) and its replay buffer (BEST_HER_PATH). This structure ensures that all logs, checkpoints, and evaluation data are organized and easily accessible during and after training.

Evaluation Callback with HER

class EvalCallbackWithHER(EvalCallback):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._last_best_mean_reward = -float("inf")  # track improvements

    def _on_step(self) -> bool:
        result = super()._on_step()

        # Only run check at evaluation steps
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            # Check if mean reward improved
            if self.last_mean_reward is not None and self.last_mean_reward > self._last_best_mean_reward:
                self._last_best_mean_reward = self.last_mean_reward

                # Save replay buffer alongside the already-saved best model
                model_file = os.path.join(self.best_model_save_path, "best_model.zip")
                if os.path.exists(model_file):
                    buffer_file = model_file.replace(".zip", "_replay_buffer.pkl")
                    self.model.save_replay_buffer(buffer_file)
                    print(f" New best model found, saved HER replay buffer to {buffer_file}")

        return result


EvalCallback

  • EvalCallback : is provided by Stable-baseline3.
  • Its job is to evaluate the agent periodically (e.g., every eval_freq steps)
  • It runs the agent in a given evaluation environment (eval_env) for a fixed number of episodes (n_eval_episodes) and computes the mean reward.
  • If the mean reward improves compared to previous evaluations, it saves the current model to best_model_save_path.
  • It also logs results to log_path.

EvalCallbackWithHer

  • This class inherits from EvalCallback and extends its functionality.
  • This also saves the HER replay buffer whenever a new best model is found

eval_callback

eval_callback = EvalCallbackWithHER(
    eval_env,
    n_eval_episodes=10,
    eval_freq=1000,
    log_path=EVAL_LOG_DIR,
    best_model_save_path=MODEL_DIR,  # Best model goes here
    deterministic=True,
    render=False
)
  • Here we are instantiating EvalCallbackWithHer with parameters.
  • This object is then passed into the training loop of an TQC.
  • During training, it will periodically evaluate the agent, save the best model and also save its HER buffer.

The TQC Model

Model Parameters

model_kwargs = dict(
    policy="MultiInputPolicy",
    env=env,
    replay_buffer_class=HerReplayBuffer,
    replay_buffer_kwargs=dict(
        n_sampled_goal=4,
        goal_selection_strategy="future",
    ),
    learning_starts=2000,
    tensorboard_log=LOG_DIR,   
    verbose=1,
    buffer_size=int(1e6),
    learning_rate=1e-3,
    gamma=0.95,
    batch_size=256,
    tau=0.05
)

The above dictionary collects all the arguments needed to initialize TQC with HER for our robot sliding task.

  • policy=”MultiInputPolicy”: In Stable-Baselines3 (and SB3-contrib), the policy defines the neural network architecture that maps observations → actions. "MultiInputPolicy" is used when the observation space is a dictionary (e.g., gym.spaces.Dict).
  • env = env: The environment defined in the section Environment
  • replay_buffer_class=HerReplayBuffer: Use Hindsight Experience Replay buffer for goal-based RL
  • replay_buffer_kwargs: Extra settings for HER buffer.
  • n_sampled_goal=4: Number of HER goals to sample per transition
  • goal_selection_strategy=”future” : How to relabel goals (“future” = sample future states as new goals)
  • learning_starts=2000: Steps before training begins (collect some random experience first)
  • tensorboard_log=LOG_DIR: Path for TensorBoard logging
  • verbose=1: Logging verbosity (1 = info messages, 0 = silent)
  • buffer_size=int(1e6): Max size of the replay buffer (1 million transitions)
  • learning_rate=1e-3: Step size for gradient updates
  • gamma=0.95: Discount factor for future rewards
  • batch_size=256: Number of samples per gradient update
  • tau=0.05: Soft update rate for target networks

Load_or_Create

def load_or_create_model(env, model_kwargs, device):
    model = None

    # Case 1: Try loading best model + buffer
    if os.path.exists(BEST_MODEL_PATH) and os.path.exists(BEST_HER_PATH):
        print("Loading BEST model and buffer...")
        model = TQC.load(BEST_MODEL_PATH, env=env, device=device)
        model.load_replay_buffer(BEST_HER_PATH)
    
    # Case 2: Try loading normal model + buffer
    elif os.path.exists(MODEL_PATH) and os.path.exists(HER_PATH):
        print("Best model not found. Loading latest saved model and buffer...")
        model = TQC.load(MODEL_PATH, env=env, device=device)
        model.load_replay_buffer(HER_PATH)

    # Case 3: Create new model
    else:
        print("No saved model found. Creating a NEW model...")
        model = TQC(**model_kwargs)

    return model

This function load_or_create_model ensures that training can resume seamlessly or start fresh if needed. It first checks for the existence of a best model (BEST_MODEL_PATH) along with its replay buffer (BEST_HER_PATH); if found, it loads both into memory for continued training or evaluation. If the best model isn’t available, it falls back to loading the most recently saved model (MODEL_PATH) and replay buffer (HER_PATH). Finally, if no saved models are found, it creates a brand-new TQC model using the provided model_kwargs. This design guarantees robustness by prioritizing the best checkpoint, falling back to the latest one, and otherwise initializing a fresh model.

Learning

model = load_or_create_model(env, model_kwargs, device)
TOTAL_TIMESTEPS = int(1e7)
model.learn(
    total_timesteps=TOTAL_TIMESTEPS,
    reset_num_timesteps=True,
    progress_bar=True,
    tb_log_name="her_tqc_run1",
    callback=eval_callback   # <-- add evaluation callback here
)
model.save(MODEL_PATH)
#Save the replay buffer too
model.save_replay_buffer(HER_PATH)

First, it initializes the model by calling load_or_create_model, which either loads the best checkpoint, the latest checkpoint, or creates a new TQC model with the given settings. Then, it defines the total training budget (TOTAL_TIMESTEPS = 10 million). The model.learn(...) method starts the actual training loop, where the agent interacts with the environment for the specified number of timesteps. Important options here include resetting the timestep counter (reset_num_timesteps=True), showing a live progress bar, logging results to TensorBoard under the name "her_tqc_run1", and using the custom evaluation callback (eval_callback) to periodically evaluate performance, save the best model, and also save the HER replay buffer. After training finishes, the model is saved to MODEL_PATH, and the replay buffer is explicitly saved to HER_PATH so that training progress (including HER experience) can be resumed later.

Tensorboard

TensorBoard is a visualization tool that lets you monitor and analyze machine learning training in real time.
It shows metrics like rewards, losses, learning rates, and graphs so you can track model performance and debug training.

!pip install tensorboard

Since we have defined

tensorboard_log = LOG_DIR
tb_log_name = "her_tqc_run1"

All the logs will be written to:

.../RoboSlide/tqc_roboslide_10M/logs/her_tqc_run1/

Run the following command

tensorboard --logdir=.../RoboSlide/tqc_roboslide_10M/logs --port=6006

Then open in your browser:

http://localhost:6006/

You will get the following view.

You can check details of each plots and even export the data.

Evaluating and Recording Video

import numpy as np
import os
from pathlib import Path
import gymnasium as gym
import gymnasium_robotics
from sb3_contrib import TQC
from stable_baselines3.common.vec_env import DummyVecEnv,VecVideoRecorder

from stable_baselines3.common.monitor import Monitor

import torch

# Set seeds for reproducibility
SEED = 42
# random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)


device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

import warnings
warnings.filterwarnings('ignore')


env_id = "FetchSlideDense-v4"

def make_env():
    return Monitor(gym.make(env_id))

env = DummyVecEnv([make_env for _ in range(4)])

# Separate evaluation env (not vectorized, just Monitor)
eval_env = Monitor(gym.make(env_id))
eval_env.reset(seed=SEED)

# Base directory for everything
BASE_DIR = ".../RoboSlide/tqc_roboslide_10M"
Path(BASE_DIR).mkdir(parents=True, exist_ok=True)

# Subdirectories

MODEL_DIR = os.path.join(BASE_DIR, "models")


Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)


BEST_MODEL_PATH = os.path.join(MODEL_DIR, "best_model.zip")
BEST_HER_PATH = os.path.join(MODEL_DIR, "best_model_replay_buffer.pkl")



def load_model(env,device):
    model = None

    # Case 1: Try loading best model + buffer
    if os.path.exists(BEST_MODEL_PATH) and os.path.exists(BEST_HER_PATH):
        print("Loading BEST model and buffer...")
        model = TQC.load(BEST_MODEL_PATH, env=env, device=device)
        model.load_replay_buffer(BEST_HER_PATH)

    
    else:
        print("No saved model found")
        # model = TQC(**model_kwargs)

    return model



model = load_model(env,device)

os.environ["MUJOCO_GL"] = "osmesa"
os.environ["PYOPENGL_PLATFORM"] = "osmesa"

video_folder = os.path.join(BASE_DIR, "videos")
os.makedirs(video_folder, exist_ok=True)
video_length = 1000

eval_env = DummyVecEnv([lambda: gym.make(env_id, render_mode="rgb_array")])
eval_env = VecVideoRecorder(
    eval_env,
    video_folder,
    record_video_trigger=lambda step: step == 0,
    video_length=video_length,
    name_prefix=f"tqc-agent-{env_id}"
)

obs= eval_env.reset()

successes = []
episode_success = []

for step in range(video_length):
    action, _ = model.predict(obs, deterministic=True)   # TQC actions
    obs, rewards, dones, infos = eval_env.step(action)

    # Each env in DummyVecEnv returns a list, so we take index [0]
    if "is_success" in infos[0]:
        episode_success.append(infos[0]["is_success"])

    if dones[0]:
        # Store success at the end of episode
        if len(episode_success) > 0:
            successes.append(float(episode_success[-1]))
        episode_success = []
        obs = eval_env.reset()

eval_env.close()

# =========================================================
# Report success rate
# =========================================================
if successes:
    success_rate = sum(successes) / len(successes)
    print(f"Evaluated {len(successes)} episodes")
    print(f"Success Rate: {success_rate*100:.2f}%")
else:
    print("No completed episodes during evaluation")
    
    

This code first defines paths for the saved model (best_model.zip) and its replay buffer, then loads them if available using TQC.load and model.load_replay_buffer. If no model is found, it skips training initialization. For evaluation, it creates a separate environment (eval_env) with render_mode="rgb_array" and wraps it in VecVideoRecorder, which is responsible for capturing and saving videos. The agent interacts with this environment for a fixed number of steps (video_length=1000), taking actions with model.predict in deterministic mode. During each step, the code checks for is_success flags in the environment’s info dictionary to track success per episode, storing results once an episode ends. Finally, it reports the overall success rate and automatically saves the video in the specified folder. For recording, the necessary steps are: enabling render_mode="rgb_array" in the environment, wrapping with VecVideoRecorder, setting a trigger (record_video_trigger), and providing a video_folder where the video will be saved.

Github