Using in a Neural Network

cuRobo’s differentiable functions (e.g., kinematics, signed distance) can be used as part of a neural network pipeline. As an example, we show how to use cuRobo’s functions as a layer in a neural network and also in a loss function.

digraph { rankdir=LR; edge [color = "#708090"; fontsize=10]; node [shape="box", style="rounded, filled", fontsize=12] nn [label="Neural Network", color="#cccccc"] input [label="Input", shape="plain", style="rounded"] output[label="Prediction", shape="plain", style="rounded"] cu_features[label="cuRobo Layer", color="#76b900",fontcolor="white"] input -> nn; input -> cu_features; cu_features -> nn; output -> loss [dir="both", style="dashed"]; nn -> output; subgraph cluster_training { label="Training"; style="dashed"; cu_loss[label="cuRobo Loss", color="#76b900",fontcolor="white"] loss [label="Loss Function", color="#cccccc"] loss -> cu_loss[dir="both",style="dashed"]; } }

The curobo.wrap.model.robot_world.RobotWorld provides a training friendly wrapper for computing kinematics, and signed distance. cuRobo computes signed distance between the robot and the world by first computing the position of the spheres that approximate the robot’s geometry in space given the joint configuration and then using a sphere to world signed distance function to compute the distance to obstacles.

digraph{ rankdir=LR; subgraph cluster_class{ label="RobotWorld"; edge [color ="#708090"; fontsize=10]; node [shape="box", color="#cccccc",style="rounded, filled", fontsize=12] get_kin[label="get_kinematics()",color="#76b900", fontcolor="white"] get_collision_distance[label="get_collision_distance()",color="#76b900", fontcolor="white"] get_self_coll[label="get_self_collision_distance()",color="#76b900", fontcolor="white"] q[label="joint angles", shape="plain", style="rounded"] ee_pose[label="Link Poses"] spheres[label="Robot Spheres"] world[label="World Collision Checker"] world_config[label="WorldConfig", style="rounded", shape="plain"] load_collision_model[label="load_collision_model()"] q -> get_kin; get_kin -> {ee_pose, spheres}; spheres -> {get_collision_distance, get_self_coll}; world -> get_collision_distance; world_config -> load_collision_model; load_collision_model -> world; } }

An instance of RobotWorld can be created as below, where world_config_file can be swapped with an instance of curobo.geom.types.WorldConfig that describes your world obstacles.

from curobo.wrap.model.robot_world import RobotWorld, RobotWorldConfig
robot_file = "franka.yml"
world_config_file = "collision_table.yml"
config = RobotWorldConfig.load_from_config(
    robot_file, world_config_file, pose_weight=[10, 200, 1, 10]
)
curobo_wrapper = RobotWorld(config)

Next, we will go over an example provided in examples/torch_layers_example.py. Import the following packages,

# Standard Library
import uuid
from typing import Optional

# Third Party
import numpy as np
import torch
from torch import nn
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm

# cuRobo
from curobo.geom.sdf.world import WorldConfig
from curobo.geom.types import Mesh
from curobo.types.math import Pose
from curobo.types.state import JointState
from curobo.util.usd_helper import UsdHelper
from curobo.util_file import get_assets_path, get_world_configs_path, join_path, load_yaml
from curobo.wrap.model.robot_world import RobotWorld, RobotWorldConfig

We will create a class that holds the neural network and loss functions, subclassing torch.nn.Module, which takes as input an instance of curobo.wrap.model.robot_world.RobotWorld.

class CuroboTorch(torch.nn.Module):
    def __init__(self, robot_world: RobotWorld):
        """Build a simple structured NN:

        q_current -> kinematics -> sdf -> features
        [features, x_des] -> NN -> kinematics -> sdf -> [sdf, pose_distance] -> NN -> q_out
        loss = (fk(q_out) - x_des) + (q_current - q_out) + valid(q_out)
        """
        super(CuroboTorch, self).__init__()

        feature_dims = robot_world.kinematics.kinematics_config.link_spheres.shape[0] * 5 + 7 + 1
        q_feature_dims = 7
        final_feature_dims = feature_dims + 1 + 7
        output_dims = robot_world.kinematics.get_dof()

        # build neural network:
        self._robot_world = robot_world

        self._feature_mlp = nn.Sequential(
            nn.Linear(q_feature_dims, 512),
            nn.ReLU6(),
            nn.Linear(512, 512),
            nn.ReLU6(),
            nn.Linear(512, 512),
            nn.ReLU6(),
            nn.Linear(512, output_dims),
            nn.Tanh(),
        )
        self._final_mlp = nn.Sequential(
            nn.Linear(final_feature_dims, 256),
            nn.ReLU6(),
            nn.Linear(256, 256),
            nn.ReLU6(),
            nn.Linear(256, 64),
            nn.ReLU6(),
            nn.Linear(64, output_dims),
            nn.Tanh(),
        )

    def get_features(self, q: torch.Tensor, x_des: Optional[Pose] = None):
        """ Compute a cuRobo Layer that computes features given joint angle."""
        kin_state = self._robot_world.get_kinematics(q)
        spheres = kin_state.link_spheres_tensor.unsqueeze(2)
        q_sdf = self._robot_world.get_collision_distance(spheres)
        q_self = self._robot_world.get_collision_distance(
            kin_state.link_spheres_tensor.unsqueeze(1)
        )

        features = [
            kin_state.link_spheres_tensor.view(q.shape[0], -1),
            q_sdf,
            q_self,
            kin_state.ee_position,
            kin_state.ee_quaternion,
        ]
        if x_des is not None:
            pose_distance = self._robot_world.pose_distance(x_des, kin_state.ee_pose)
            features.append(pose_distance)
            features.append(x_des.position)
            features.append(x_des.quaternion)
        features = torch.cat(features, dim=-1)

        return features

    def forward(self, q: torch.Tensor, x_des: Pose):
        """Forward for neural network

        Args:
            q (torch.Tensor): _description_
            x_des (torch.Tensor): _description_
        """
        # get features for input:
        in_features = torch.cat([x_des.position, x_des.quaternion], dim=-1)
        # pass through initial mlp:
        q_mid = self._feature_mlp(in_features)

        q_scale = self._robot_world.bound_scale * q_mid
        # get new features:
        mid_features = self.get_features(q_scale, x_des=x_des)
        q_out = self._final_mlp(mid_features)
        q_out = self._robot_world.bound_scale * q_out
        return q_out

    def loss(self, x_des: Pose, q: torch.Tensor, q_in: torch.Tensor):
        """ Compute training loss """
        kin_state = self._robot_world.get_kinematics(q)
        distance = self._robot_world.pose_distance(x_des, kin_state.ee_pose)
        d_sdf = self._robot_world.collision_constraint(kin_state.link_spheres_tensor.unsqueeze(1))
        d_self = self._robot_world.self_collision_cost(kin_state.link_spheres_tensor.unsqueeze(1))
        loss = 0.1 * torch.linalg.norm(q_in - q, dim=-1) + distance + 100.0 * (d_self + d_sdf)
        return loss

    def val_loss(self, x_des: Pose, q: torch.Tensor, q_in: torch.Tensor):
        """ Compute validatin loss """
        kin_state = self._robot_world.get_kinematics(q)
        distance = self._robot_world.pose_distance(x_des, kin_state.ee_pose)
        d_sdf = self._robot_world.collision_constraint(kin_state.link_spheres_tensor.unsqueeze(1))
        d_self = self._robot_world.self_collision_cost(kin_state.link_spheres_tensor.unsqueeze(1))
        loss = 10.0 * (d_self + d_sdf) + distance
        return loss

We train a network to output collision-free inverse kinematics given a goal pose below,

if __name__ == "__main__":
    update_goal = False
    write_usd = False
    writer = SummaryWriter("log/runs/ik/" + str(uuid.uuid4()))
    robot_file = "franka.yml"
    world_file = "collision_table.yml"
    config = RobotWorldConfig.load_from_config(
        robot_file, world_file, pose_weight=[10, 200, 1, 10]
    )
    curobo_fn = RobotWorld(config)

    model = CuroboTorch(curobo_fn)
    model.cuda()
    with torch.no_grad():
        # q_train = curobo_fn.sample(10000)
        q_val = curobo_fn.sample(100)
        q_train = curobo_fn.sample(5 * 2048)
    usd_list = []
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-8)
    batch_size = 512
    batch_start = torch.arange(0, q_train.shape[0], batch_size)
    with torch.no_grad():
        x_des = curobo_fn.get_kinematics(q_train[1:2]).ee_pose
        x_des_train = curobo_fn.get_kinematics(q_train).ee_pose
        x_des_val = x_des.repeat(q_val.shape[0])
    q_debug = []
    bar = tqdm(range(500))
    for e in bar:
        model.train()
        for j in range(batch_start.shape[0]):
            x_train = q_train[batch_start[j] : batch_start[j] + batch_size]
            if x_train.shape[0] != batch_size:
                continue
            x_des_batch = x_des_train[batch_start[j] : batch_start[j] + batch_size]
            q = model.forward(x_train, x_des_batch)
            loss = model.loss(x_des_batch, q, x_train)
            loss = torch.mean(loss)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        writer.add_scalar("training loss", loss.item(), e)
        model.eval()
        with torch.no_grad():
            q_pred = model.forward(q_val, x_des_val)
            val_loss = model.val_loss(x_des_val, q_pred, q_val)
            val_loss = torch.mean(val_loss)
            q_debug.append(q_pred[0:1].clone())
        writer.add_scalar("validation loss", val_loss.item(), e)
        bar.set_description("t: " + str(val_loss.item()))
        if e % 100 == 0 and len(q_debug) > 1:
            if write_usd: # store ik solution in a animated usd.
                q_traj = torch.cat(q_debug, dim=0)
                world_model = WorldConfig.from_dict(
                    load_yaml(join_path(get_world_configs_path(), world_file))
                )
                gripper_mesh = Mesh(
                    name="target_gripper",
                    file_path=join_path(
                        get_assets_path(),
                        "robot/franka_description/meshes/visual/hand_ee_link.dae",
                    ),
                    color=[0.0, 0.8, 0.1, 1.0],
                    pose=x_des[0].tolist(),
                )
                world_model.add_obstacle(gripper_mesh)
                save_name = "e_" + str(e)
                UsdHelper.write_trajectory_animation_with_robot_usd(
                    "franka.yml",
                    world_model,
                    JointState(position=q_traj[0]),
                    JointState(position=q_traj),
                    dt=1.0,
                    visualize_robot_spheres=False,
                    save_path=save_name + ".usd",
                    base_frame="/" + save_name,
                )
                usd_list.append(save_name + ".usd")
            if update_goal:
                with torch.no_grad():
                    rand_perm = torch.randperm(q_val.shape[0])
                    q_val = q_val[rand_perm].clone()
                    x_des = curobo_fn.get_kinematics(q_val[0:1]).ee_pose
                    x_des_val = x_des.repeat(q_val.shape[0])
            q_debug = []

    # create a grid of animated usds:
    if write_usd:
        UsdHelper.create_grid_usd(
            usd_list,
            "epoch_grid.usd",
            "/world",
            max_envs=len(usd_list),
            max_timecode=len(q_debug),
            x_space=2.0,
            y_space=2.0,
            x_per_row=int(np.sqrt(len(usd_list))) + 1,
            local_asset_path="",
            dt=1,
        )