F1Tenth - 16299 Project

by jl24 in Circuits > Robots

69 Views, 1 Favorites, 0 Comments

F1Tenth - 16299 Project

Screenshot 2025-04-27 at 4.06.06 PM.png

The goal of this project was to implement a program for an autonomous race car to be able to drive and avoid dynamic obstacles. This project was done on an F1Tenth (now rebranded as Roboracer) platform, an open-source project started at The University of Pennsylvania and now used at schools worldwide, with competitions held at some of the biggest robotics conferences such as ICRA.


For this project specifically, RRT was used. While there are better performing algorithms out there, RRT is a relatively easy to implement algorithm although still complex.

Setting Up the Simulation

While normally you would need a native linux machine to run ROS2 and the simulation, docker makes it easy for anyone to use a linux instance. To put it simply, Docker allows you to run instances of pre-setup images that are already installed with everything you need, including the operating system and any packages you might need.

https://github.com/f1tenth/f1tenth_gym_ros

The link above is a guide to get the simulation set up on any machine. Follow the steps under "Without a Nvidia GPU" if you don't have a linux machine. Once done, you should be able to run the simulation and teleop the car yourself.

Add the RRT Node

Screenshot 2025-04-28 at 5.23.32 PM.png

The above picture is a high-level description of the RRT algorithm. You can find an in depth explanation on the algorithm as well as some of its variants in the paper here. In your workspace, clone the template package for the rrt node from here. This contains some starter code for rrt as well as all the dependencies needed for rrt setup for you. You can use the code provided and put in the rrt_node.py file. If you want to run the code, keep in mind you need to build then source the workspace. If you are unfamiliar with ROS2, you can read the docs that they provided to get a comprehensive guide to the different components of ROS2, such as what a publisher and subscriber is, how to create and run nodes, and more. Example RRT code is provided below.

#!/usr/bin/env python3
"""
This file contains the class definition for tree nodes and RRT
Before you start, please read: https://arxiv.org/pdf/1105.1186.pdf
"""
import numpy as np
from numpy import linalg as LA
import math

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import PointStamped
from geometry_msgs.msg import Pose
from geometry_msgs.msg import Point
from geometry_msgs.msg import PoseWithCovarianceStamped
from nav_msgs.msg import Odometry, Path
from ackermann_msgs.msg import AckermannDriveStamped, AckermannDrive
from nav_msgs.msg import OccupancyGrid
from visualization_msgs.msg import Marker

# TODO: import as you need
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
import scipy

import time

# class def for tree nodes
# It's up to you if you want to use this
class TreeNode(object):
def __init__(self):
self.x = None
self.y = None
self.parent = None
self.cost = None # only used in RRT*
self.is_root = False

# class def for RRT
class RRT(Node):
def __init__(self):
super().__init__('rrt')
# topics, not saved as attributes
# TODO: grab topics from param file, you'll need to change the yaml file
pose_topic = "/pf/viz/inferred_pose" # TBD: will need to be updated
scan_topic = "/scan"
drive_topic = "/drive"
og_topic = "/dynamic_map"

# you could add your own parameters to the rrt_params.yaml file,
# and get them here as class attributes as shown above.

# TODO: create subscribers
self.pose_sub_ = self.create_subscription( # TBD: will need to be updated
#PoseStamped,
PoseStamped,
pose_topic,
self.pose_callback,
1)

self.scan_sub_ = self.create_subscription(
LaserScan,
scan_topic,
self.scan_callback,
1)

# publishers
# TODO: create a drive message publisher, and other publishers that you might need
self.drive_pub_ = self.create_publisher(AckermannDriveStamped, drive_topic, 1)
self.og_pub_ = self.create_publisher(OccupancyGrid, og_topic, 1)

# debugging
self.tree_nodes_pub_ = self.create_publisher(Marker, '/tree_nodes', 10)
self.path_pub_ = self.create_publisher(Path, '/found_path', 10)
self.marker1_pub_ = self.create_publisher(Marker,'/goal_waypoint', 1)
self.marker2_pub_ = self.create_publisher(Marker,'/local_target', 1)
self.marker3_pub_ = self.create_publisher(Marker,'/samples', 1)

# class attributes

# occupancy grid attributes
self.og_height = 2.0 # m
self.og_width = 3.0 # m
self.og_resolution = 0.05 # m
self.kernel_size = 9

# odometry attributes (global - map frame)
self.x_current = 0.0
self.y_current = 0.0
self.heading_current = 0.0

# global planner parameters
self.x_current_goal = 0.0
self.y_current_goal = 0.0
self.global_waypoints = np.genfromtxt('/home/team5/f1tenth_ws/src/HMPC/waypoints/practice1_waypoints.csv', delimiter=',')
self.global_waypoints = self.global_waypoints[:, 0 : 2]

# physical car attributes
self.base_to_lidar = 0.27275 # m
self.edge_to_lidar = 0.10743 # m

# RRT parameters
self.max_rrt_iterations = 1000
self.lookahead_distance = 2.0 # m
self.steer_range = 0.3 # m``
self.goal_tolerance = 0.25 # m
self.collision_checking_points = 20

# RRT* parameters
self.enable_rrt_star = False
self.search_radius = 2.0 # m

# sampling parameters
self.sample_bias = 0.3
self.std_deviation = 0.5

# pure pursuit parameters
self.clamp_angle = 25.0 # deg
self.steering_gain = 1.0

# initialize occupancy grid
self.occupancy_grid = OccupancyGrid()
self.init_occupancy_grid()


def init_occupancy_grid(self):
"""
Initialize occupancy grid
"""
rows = int(self.og_height // self.og_resolution)
cols = int(self.og_width // self.og_resolution)
self.occupancy_grid.header.frame_id = "laser"
self.occupancy_grid.info.width = cols
self.occupancy_grid.info.height = rows
self.occupancy_grid.info.resolution = self.og_resolution
self.occupancy_grid.header.stamp = self.get_clock().now().to_msg()
data = np.full((rows, cols), -1, np.int8)
self.occupancy_grid.data = data.flatten().tolist()
# self.occupancy_grid.info.origin.position.x = -self.base_to_lidar
self.occupancy_grid.info.origin.position.x = 0.0
self.occupancy_grid.info.origin.position.y = -(rows // 2) * self.og_resolution
self.occupancy_grid.info.origin.position.z = 0.0
self.og_pub_.publish(self.occupancy_grid)

def convert_map_to_base(self, x_map, y_map):
"""
Converts map coordinates to base link frame coordinates
"""
x_base_rot = x_map - self.x_current
y_base_rot = y_map - self.y_current
angle = -self.heading_current
rot_matrix = [[np.cos(angle), -np.sin(angle)],
[np.sin(angle), np.cos(angle)]]
[x_grid, y_grid] = np.matmul(rot_matrix, [x_base_rot, y_base_rot])

return (x_grid, y_grid)


def convert_base_to_map(self, x_base, y_base):
"""
Converts base link frame coordinates to map frame
"""
# rotate to parallel to map frame
rot_matrix = [[np.cos(self.heading_current), -np.sin(self.heading_current)],
[np.sin(self.heading_current), np.cos(self.heading_current)]]
[x_base_rot, y_base_rot] = np.matmul(rot_matrix, [x_base, y_base])

# translate by odom
x_map = x_base_rot + self.x_current
y_map = y_base_rot + self.y_current

return (x_map, y_map)

def is_occupied(self, x_grid, y_grid):
"""
Checks if lidar coordinate x, y is occupied in occupancy grid
"""
# get corresponding cell in occupany grid
row = int(self.occupancy_grid.info.height // 2 + y_grid // self.og_resolution)
col = int(x_grid // self.og_resolution)

if (row < 0 or col < 0):
return False

if (row >= self.occupancy_grid.info.height or col >= self.occupancy_grid.info.width):
return False
og_index = int(row * self.occupancy_grid.info.width + col)

if (self.occupancy_grid.data[og_index] > 0):
return True
else:
return False

def get_next_goal(self):
"""
Gets next global planner waypoint to get to using RRT
"""
best_index = -1
best_goal_distance = 10000.0
for i in range(len(self.global_waypoints)):
global_x = self.global_waypoints[i][0]
global_y = self.global_waypoints[i][1]
(global_x_grid, global_y_grid) = self.convert_map_to_base(global_x, global_y)

if (global_x_grid < self.base_to_lidar):
# goal behind car, skip
continue

goal_dist = np.abs(self.lookahead_distance - np.sqrt(global_x_grid**2 + global_y_grid**2))

if (goal_dist < best_goal_distance):
# make sure it is not an occupied point
# if (self.is_occupied(global_x_grid, global_y_grid)):
# continue

best_goal_distance = goal_dist
best_index = i

return (self.global_waypoints[best_index][0], self.global_waypoints[best_index][1])

def display_marker(self, frame, r, g, b, current_waypoint):
marker = Marker()
marker.header.frame_id = frame
marker.ns = "marker"
marker.header.stamp = self.get_clock().now().to_msg()
marker.id = 0
marker.type = 2
marker.action = 0
marker.pose.position.x = current_waypoint[0]
marker.pose.position.y = current_waypoint[1]
marker.pose.position.z = 0.0
marker.pose.orientation.w = 1.0
marker.scale.x = 0.25
marker.scale.z = 0.1
marker.scale.y = 0.25
marker.color.a = 1.0
marker.color.r = r
marker.color.g = g
marker.color.b = b
return marker

def visualize_tree(self, tree):
tree_nodes = Marker()
tree_nodes.header.frame_id = "map"
tree_nodes.ns = "marker"
tree_nodes.header.stamp = self.get_clock().now().to_msg()
tree_nodes.id = 1
tree_nodes.type = 8
tree_nodes.action = 0
tree_nodes.scale.x = 0.1
tree_nodes.scale.z = 0.1
tree_nodes.scale.y = 0.1
tree_nodes.color.a = 1.0
tree_nodes.color.r = 0.0
tree_nodes.color.g = 0.0
tree_nodes.color.b = 1.0

for node in tree:
point = Point()
point.x = node.x
point.y = node.y
point.z = 0.0
tree_nodes.points.append(point)

self.tree_nodes_pub_.publish(tree_nodes)
tree_nodes.points.clear()

def visualize_path(self, path):
path_msg = Path()
path_msg.header.stamp = self.get_clock().now().to_msg()
path_msg.header.frame_id = "map"
for node in path:
loc = PoseStamped()
loc.header.stamp = self.get_clock().now().to_msg()

loc.header.frame_id = "map"
loc.pose.position.x = node.x
loc.pose.position.y = node.y
loc.pose.position.z = 0.00
path_msg.poses.append(loc)

self.path_pub_.publish(path_msg)


def preprocess_lidar(self, ranges):
pass


def get_speed(self, angle):
abs_angle = np.abs(angle)
if abs_angle >= np.deg2rad(15):
speed = 0.75
elif abs_angle >= np.deg2rad(10):
speed = 1.0
elif abs_angle >= np.deg2rad(5):
speed = 1.5
else:
speed = 2.0
# speed = 0.5
return speed
#return 0.0

def inflate_obstacles(self, kernel):

height = self.occupancy_grid.info.height
width = self.occupancy_grid.info.width

# Get kernel dimensions
k_height, k_width = kernel.shape

# Compute padding for the input image
pad_height = k_height // 2
pad_width = k_width // 2

og_grid_data = np.array(self.occupancy_grid.data)
og_grid_data = og_grid_data.reshape((height, width))

# Create an empty output image
dilated_image = np.zeros_like(og_grid_data)

# Pad the input image
padded_image = np.pad(og_grid_data, ((pad_height, pad_height), (pad_width, pad_width)), mode='constant')

# # Apply dilation
for y in range(height):
for x in range(width):
neighborhood = padded_image[y:y + k_height, x:x + k_width]
dilated_image[y, x] = np.max(neighborhood * kernel)
self.occupancy_grid.data = dilated_image.flatten().tolist()


def scan_callback(self, scan_msg):
"""
LaserScan callback, you should update your occupancy grid here

Args:
scan_msg (LaserScan): incoming message from subscribed topic
Returns:
"""
ranges = np.array(scan_msg.ranges)
rows = self.occupancy_grid.info.height
cols = self.occupancy_grid.info.width

proc_ranges = np.copy(ranges)
proc_ranges[proc_ranges < scan_msg.range_min] = scan_msg.range_min
proc_ranges[proc_ranges > scan_msg.range_max] = scan_msg.range_max
proc_ranges[np.isnan(proc_ranges) | np.isinf(proc_ranges)] = scan_msg.range_max

# self.preprocess_lidar(proc_ranges)

# Create meshgrid of row and col indices
col_indices, row_indices = np.meshgrid(np.arange(cols), np.arange(rows))

# Calculate x and y coordinates for each cell
x_cell = col_indices * self.og_resolution + (self.og_resolution / 2)
y_cell = (row_indices - (rows / 2)) * self.og_resolution + (self.og_resolution / 2)

# Calculate distance to each cell
distance_to_cell = np.sqrt(x_cell**2 + y_cell**2)

# Calculate angle to each cell
angle_to_cell = np.arctan2(y_cell, x_cell)

# Find closest index in scan_msg for each cell
closest_index = ((angle_to_cell - scan_msg.angle_min) / scan_msg.angle_increment).astype(int)

# Get distance to object for each cell
distance_to_obj = proc_ranges[closest_index]

# Create occupancy grid data
occupancy_grid_data = np.where(distance_to_cell >= distance_to_obj, 100, 0)

# Flatten the occupancy grid data and assign it to self.occupancy_grid.data
self.occupancy_grid.data = occupancy_grid_data.flatten().tolist()

kernel = np.ones((self.kernel_size, self.kernel_size))
self.inflate_obstacles(kernel)
self.occupancy_grid.header.stamp = self.get_clock().now().to_msg()

self.og_pub_.publish(self.occupancy_grid)


def pose_callback(self, pose_msg):
"""
The pose callback when subscribed to particle filter's inferred pose
Here is where the main RRT loop happens

Args:
pose_msg (PoseStamped): incoming message from subscribed topic
Returns:

"""
# TBD: will need to be updated for running on car
self.x_current = pose_msg.pose.position.x
self.y_current = pose_msg.pose.position.y

# convert rotation quaternion to heading angle
q = [pose_msg.pose.orientation.w, pose_msg.pose.orientation.x,
pose_msg.pose.orientation.y, pose_msg.pose.orientation.z]
sin_angle = 2.0 * (q[0] * q[3] + q[1] * q[2])
cos_angle = 1.0 - 2.0 * (q[2]**2 + q[3]**2)
self.heading_current = np.arctan2(sin_angle, cos_angle)

# update next goal global waypoint
(self.x_current_goal, self.y_current_goal) = self.get_next_goal()
(goal_x_grid, goal_y_grid) = self.convert_map_to_base(self.x_current_goal, self.y_current_goal)
dist_to_goal = (self.x_current - self.x_current_goal)**2 + (self.y_current - self.y_current_goal)**2

if (self.is_occupied(goal_x_grid, goal_y_grid)):
print("goal occupied")
self.goal_tolerance = 0.9
else:
self.goal_tolerance = 0.25

# debugging
p_map = self.display_marker("map", 1.0, 0.0, 0.0, [self.x_current_goal, self.y_current_goal])
self.marker1_pub_.publish(p_map)

# define starter node
start_node = TreeNode()
start_node.x = self.x_current
start_node.y = self.y_current
start_node.is_root = True
start_node.parent = 0
start_node.cost = 0.0

# initialize tree
tree = [start_node]

for iter in range(self.max_rrt_iterations):
sampled_point = self.sample()
dist_to_sample = (self.x_current - sampled_point[0])**2 + (self.y_current - sampled_point[1])**2

(sample_x_grid, sample_y_grid) = self.convert_map_to_base(sampled_point[0], sampled_point[1])
if ((self.is_occupied(sample_x_grid, sample_y_grid)) or (dist_to_sample > dist_to_goal)):
continue
# debugging
# grid_point = self.convert_map_to_base(sampled_point[0], sampled_point[1])
# p_grid = self.display_marker("ego_racecar/laser", 0.0, 1.0, 0.0, [grid_point[0], grid_point[1]])
# self.marker2_pub_.publish(p_grid)

nearest_indx = self.nearest(tree, sampled_point)

new_node = self.steer(tree[nearest_indx], sampled_point)
(new_x_grid, new_y_grid) = self.convert_map_to_base(new_node.x, new_node.y)
if (self.is_occupied(new_x_grid, new_y_grid)):
continue

new_node.parent = nearest_indx
current_node_index = len(tree)

if (not self.check_collision(tree[nearest_indx], new_node)):
# run RRT* if enabled
if (self.enable_rrt_star):
new_node.cost = self.cost(tree, new_node)
near_neighbour_indices = self.near(tree, new_node)
# initialize array of bools of neighbours which collide
neighbour_collisions = []
best_neighbour = new_node.parent

for near_node_index in near_neighbour_indices:
# check which neighbours collide
if (self.check_collision(tree[near_node_index], new_node)):
neighbour_collisions.append(True)
continue

neighbour_collisions.append(False)
# update best neighbour based on cost
cost_value = tree[near_node_index].cost + self.line_cost(tree[near_node_index], new_node)
if (cost_value < new_node.cost):
new_node.cost = cost_value
new_node.parent = near_node_index
best_neighbour = near_node_index

for i in range(len(near_neighbour_indices)):
if (i == best_neighbour) or (neighbour_collisions[i]):
continue

near_neighbour = tree[near_neighbour_indices[i]]
if (near_neighbour.cost > new_node.cost + self.line_cost(new_node, near_neighbour)):
# rewire
near_neighbour.parent = current_node_index

if ((self.is_occupied(new_x_grid, new_y_grid))):
p_map = self.display_marker("map", 0.0, 1.0, 0.0, [new_node.x, new_node.y])
self.marker3_pub_.publish(p_map)

tree.append(new_node)

if (self.is_goal(new_node, self.x_current_goal, self.y_current_goal)):
# close enough to goal
path = self.find_path(tree, new_node)
self.track_path(path)
# self.visualize_tree(tree)
self.visualize_path(path)
break


def sample(self):
"""
This method should randomly sample the free space, and returns a viable point

Args:
Returns:
(x, y) (float float): a tuple representing the sampled point

"""
# x = np.random.uniform(0.0, self.og_width) # lidar frame
# y = np.random.uniform(- self.og_height / 2, self.og_height / 2) # lidar frame

(goal_grid_x, goal_grid_y) = self.convert_map_to_base(self.x_current_goal, self.y_current_goal)

if np.random.rand() < self.sample_bias:
# Sample near the goal point using Gaussian distribution
x = np.random.normal(goal_grid_x, self.std_deviation)
y = np.random.normal(goal_grid_y, self.std_deviation)
else:
# Sample uniformly in the free space
x = np.random.uniform(0.0, self.og_width) # lidar frame
y = np.random.uniform(-self.og_height / 2, self.og_height / 2) # lidar frame
# if ((not self.is_occupied(x, y)) and (x <= goal_grid_x) and (x >= self.base_to_lidar)):
# # convert to map coordinates from grid coordinates

(x_map, y_map) = self.convert_base_to_map(x, y)

return (x_map, y_map)
# else:
# return self.sample()


def nearest(self, tree, sampled_point):
"""
This method should return the nearest node on the tree to the sampled point

Args:
tree ([]): the current RRT tree
sampled_point (tuple of (float, float)): point sampled in free space
Returns:
nearest_node (int): index of neareset node on the tree
"""
nearest_node = 0
# min_dist = 10000.0
# for i in range(len(tree)):
# node = tree[i]
# sq_dist = (sampled_point[0] - node.x)**2 + (sampled_point[1] - node.y)**2
# if (sq_dist < min_dist):
# nearest_node = i
# min_dist = sq_dist
sampled_point = np.array(sampled_point)
tree_points = np.array([(node.x, node.y) for node in tree])

# Calculate squared distances between sampled point and all tree nodes
sq_distances = np.sum((tree_points - sampled_point)**2, axis=1)

# Find the index of the node with the minimum squared distance
nearest_node = np.argmin(sq_distances)

return nearest_node


def steer(self, nearest_node, sampled_point):
"""
This method should return a point in the viable set such that it is closer
to the nearest_node than sampled_point is.

Args:
nearest_node (Node): nearest node on the tree to the sampled point
sampled_point (tuple of (float, float)): sampled point
Returns:
new_node (Node): new node created from steering
"""
new_node = TreeNode()
dist = np.sqrt((sampled_point[0] - nearest_node.x)**2 + (sampled_point[1] - nearest_node.y)**2)
x = sampled_point[0] - nearest_node.x
y = sampled_point[1] - nearest_node.y

if (dist < self.steer_range):
new_node.x = sampled_point[0]
new_node.y = sampled_point[1]
else:
theta = np.arctan2(y, x)
new_node.x = nearest_node.x + np.cos(theta) * self.steer_range
new_node.y = nearest_node.y + np.sin(theta) * self.steer_range
# new_node.x = nearest_node.x + min(self.steer_range, dist) * (sampled_point[0] - nearest_node.x) / dist
# new_node.y = nearest_node.y + min(self.steer_range, dist) * (sampled_point[1] - nearest_node.y) / dist
return new_node

def check_collision(self, nearest_node, new_node):
"""
This method should return whether the path between nearest and new_node is
collision free.

Args:
nearest (Node): nearest node on the tree
new_node (Node): new node from steering
Returns:
collision (bool): whether the path between the two nodes are in collision
with the occupancy grid
"""
collision = False
x_cell_diff = abs(int((nearest_node.x - new_node.x) / self.collision_checking_points))
y_cell_diff = abs(int((nearest_node.y - new_node.y) / self.collision_checking_points))

# dt = 1.0 / max(x_cell_diff, y_cell_diff)
# t = 0.0
current_x = nearest_node.x
current_y = nearest_node.y

for i in range(self.collision_checking_points):
# x = nearest_node.x + t * (new_node.x - nearest_node.x)
# y = nearest_node.y + t * (new_node.y - nearest_node.y)
current_x += x_cell_diff
current_y += y_cell_diff

# convert map to grid coordinates to check if occupied
# (x_grid, y_grid) = self.convert_map_to_base(x, y)
(x_grid, y_grid) = self.convert_map_to_base(current_x, current_y)

if (self.is_occupied(x_grid, y_grid)):
collision = True
break

# t += dt

return collision


def is_goal(self, latest_added_node, goal_x, goal_y):
"""
This method should return whether the latest added node is close enough
to the goal.

Args:
latest_added_node (Node): latest added node on the tree
goal_x (double): x coordinate of the current goal
goal_y (double): y coordinate of the current goal
Returns:
close_enough (bool): true if node is close enoughg to the goal
"""
distance = np.sqrt((latest_added_node.x - goal_x)**2 + (latest_added_node.y - goal_y)**2)
return distance < self.goal_tolerance


def find_path(self, tree, latest_added_node):
"""
This method returns a path as a list of Nodes connecting the starting point to
the goal once the latest added node is close enough to the goal

Args:
tree ([]): current tree as a list of Nodes
latest_added_node (Node): latest added node in the tree
Returns:
path ([]): valid path as a list of Nodes
"""
path = []
current = latest_added_node
while (not current.is_root):
path.append(current)
current = tree[current.parent]
path.append(current)
path.reverse()

(goal_x_grid, goal_y_grid) = self.convert_map_to_base(self.x_current_goal, self.y_current_goal)
if (not self.is_occupied(goal_x_grid, goal_y_grid)):
goal_node = TreeNode()
goal_node.x = self.x_current_goal
goal_node.y = self.y_current_goal
path.append(goal_node)
return path

def track_path(self, path):
"""
Finds node in path just within lookahead distance and follows pure pursuit
"""
best_index = 0
closest_distance = 10000.0
closest_distance_current_pose = 10000.0
for i in range(len(path)):
x = path[i].x
y = path[i].y
dist = (self.x_current - x) ** 2 + (self.y_current - y) ** 2
diff_distance = np.abs(self.lookahead_distance - dist)
# if (dist < self.lookahead_distance ** 2):
# best_index = i
if (diff_distance < closest_distance):
closest_distance = diff_distance
best_index = i
closest_distance_current_pose = dist
# get next point for pure pursuit using average
p1 = np.array([path[best_index].x, path[best_index].y])
# p2 = np.array([path[min(best_index + 1, len(path) - 1)].x, path[min(best_index + 1, len(path) - 1)].y])
# avg_target_map = (p1 + p2) / 2.0
avg_target_map = p1
avg_target_base = self.convert_map_to_base(avg_target_map[0], avg_target_map[1])

if (self.is_occupied(avg_target_base[0], avg_target_base[1])):
print("target occupied")

# debugging
p_map = self.display_marker("map", 0.0, 0.0, 1.0, [avg_target_map[0], avg_target_map[1]])
self.marker2_pub_.publish(p_map)

# calculate curvature/steering angle
# angle = (2 * avg_target_base[1]) / (self.lookahead_distance ** 2)
angle = (2 * avg_target_base[1]) / (closest_distance_current_pose ** 2)
angle = np.clip(angle, -np.deg2rad(self.clamp_angle), np.deg2rad(self.clamp_angle))
angle = self.steering_gain * angle

drive_msg = AckermannDriveStamped()
drive_msg.drive.steering_angle = angle
drive_msg.drive.speed = self.get_speed(angle)

print("steering at angle: ", np.rad2deg(angle))

self.drive_pub_.publish(drive_msg)


# The following methods are needed for RRT* and not RRT
def cost(self, tree, node):
"""
This method should return the cost of a node

Args:
node (Node): the current node the cost is calculated for
Returns:
cost (float): the cost value of the node
"""
return tree[node.parent].cost + self.line_cost(tree[node.parent], node)


def line_cost(self, n1, n2):
"""
This method should return the cost of the straight line between n1 and n2

Args:
n1 (Node): node at one end of the straight line
n2 (Node): node at the other end of the straint line
Returns:
cost (float): the cost value of the line
"""
return np.sqrt((n1.x - n2.x)**2 + (n1.y - n2.y)**2)


def near(self, tree, node):
"""
This method should return the neighborhood of nodes around the given node

Args:
tree ([]): current tree as a list of Nodes
node (Node): current node we're finding neighbors for
Returns:
neighborhood ([]): neighborhood of nodes as a list of Nodes
"""
neighborhood = []
for i in range(len(tree)):
distance = (node.x - tree[i].x)**2 + (node.y - tree[i].y)**2
if (distance < self.search_radius**2):
neighborhood.append(i)

return neighborhood

def main(args=None):
rclpy.init(args=args)
print("RRT Initialized")
rrt_node = RRT()
rclpy.spin(rrt_node)

rrt_node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Create Waypoints

We can write a new script to listen the clicked points topic. This allows you to click on various points in the rviz simulation and save them as a list of waypoints for the car to follow while running RRT. Example code is provided below.


#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PointStamped

import numpy as np
import sys
import signal
import csv
from scipy.interpolate import splprep, splev
import matplotlib.pyplot as plt

class RecordWaypointsNode(Node):
def __init__(self):
super().__init__('record_waypoints_node')

# subscribe to clicked_points topic
self.clicked_pts_sub = self.create_subscription(PointStamped, '/clicked_point', self.callback, 10)

# clicked waypoints
self.clicked_waypoints = np.empty((0, 2))

def callback(self, msg):
point_x = msg.point.x
point_y = msg.point.y
print("point: ", point_x, point_y)
new_waypoint = np.array([[point_x, point_y]])
self.clicked_waypoints = np.concatenate((self.clicked_waypoints, new_waypoint), axis=0)


def save_recorded_waypoints(waypoints, filename):
with open(filename, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile)
for waypoint in waypoints:
csv_writer.writerow(waypoint)

def interpolate_waypoints(waypoints):
waypoints_x = waypoints[:, 0]
waypoints_y = waypoints[:, 1]
tck, u = splprep([waypoints_x, waypoints_y], s=0)
new_x, new_y = splev(u, tck)

new_points = np.vstack((new_x, new_y)).T

fig, ax = plt.subplots()
ax.plot(waypoints_x, waypoints_y, 'ro')
ax.plot(new_x, new_y, 'r-')
plt.savefig('waypoints_tepper.png')

return new_points

def main(args=None):
rclpy.init(args=args)
print("Record Waypoints Initialized")
record_waypoints_node = RecordWaypointsNode()

def sigint_handler(sig, frame):
nonlocal record_waypoints_node
print("Received termination signal. Shutting down...")
record_waypoints_node.destroy_node()
rclpy.shutdown()

# record clicked waypoints and save to csv
recorded_waypoints = record_waypoints_node.clicked_waypoints
save_recorded_waypoints(recorded_waypoints, "waypoints_tepper.csv")

# interpolate between waypoints and save to csv
interpolated_waypoints = interpolate_waypoints(recorded_waypoints)
save_recorded_waypoints(interpolated_waypoints, "interpolated_tepper.csv")

sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

rclpy.spin(record_waypoints_node)
record_waypoints_node.destroy_node()
rclpy.shutdown()



if __name__ == '__main__':
main()



Run RRT in Sim

You can now run rrt in sim using the command "ros2 run rrt rrt_node.py" or something similar depending on the name of your package or node. An example of the car running in the sim is shown in the video below.

Downloads

(optional) Build a Car

Instructions for what materials you need and the instructions to build and setup the vehicle can be found at the official roboracer website. The total cost is around 3.5k. Some of the more expensive parts include the jetson nano, lidar, vesc, and the chassis itself.

(Optional) Creating a Map

Screenshot 2025-04-28 at 5.37.44 PM.png

Previously in simulation, there was a default map provided. If you want to run RRT on a physical car, you need to first create a map and a set of waypoints. To do so, you need to install slam toolbox on the car. Run the slam toolbox with rviz open while running your car using teleop, and you should be able to see a map being created. Shown above is a map of part of Tepper first floor created using the slam toolbox. To create a set of waypoints, we can then use the same method as earlier.

(Optional) Running RRT on the Physical Car

rrt_recording (1).gif

We need to first install particle filter on the car. This allows the car to localize aka figure out where on the map it is. In simulation, this wasn't needed because the sim itself provides a ground truth pose. Finally, we can run rrt on the physical car by running the rrt script and the particle filter together. You may need to change some of the topics that youre rrt node publishes and subscribes to due to the fact that some of them are different depending on whether you are running in sim or on the physical car.

(Optional) Next Steps

There's tons of different ways this algorithm can be improved. There's tons of room optimization in the rrt code, such as the runtime, how it's sampling points, and much more. Another option would also be to implement some of its variants such as RRT*, which creates smoother paths at the cost of performance.