inference finish

This commit is contained in:
hofee
2025-01-15 14:42:54 +08:00
commit 6bb8835d00
30 changed files with 1158013 additions and 0 deletions

View File

@@ -0,0 +1,340 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
class PointCloud:
def __init__(points, camera, name):
pass
class PointCloudGroup:
def __init__(point_clouds, name):
pass
@stereotype.runner("temp")
class CADCloseLoopOnlineRegStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_strategy")
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.output_dir = self.generate_config["output_dir"]
self.model_dir = self.generate_config["model_dir"]
self.object_name = self.generate_config["object_name"]
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_shot_view_num = self.reconstruct_config["max_shot_view_num"]
self.min_shot_new_pts_num = self.reconstruct_config["min_shot_new_pts_num"]
self.min_coverage_increase = self.reconstruct_config["min_coverage_increase"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def loop_scan(self, first_cam_to_real_world):
view_pts_list = []
first_view_data = CommunicateUtil.get_view_data(init=True)
ControlUtil.absolute_rotate_display_table(90)
first_pts = ViewUtil.get_pts(first_view_data)
first_real_world_pts = PtsUtil.transform_point_cloud(
first_pts, first_cam_to_real_world
)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
first_real_world_pts
)
view_pts_list.append(first_splitted_real_world_pts)
shot_num = 4
for i in range(shot_num-1):
view_data = CommunicateUtil.get_view_data()
if i != shot_num - 2:
ControlUtil.absolute_rotate_display_table(90)
time.sleep(0.5)
if view_data is None:
Log.error("No view data received")
continue
view_pts = ViewUtil.get_pts(view_data)
real_world_pts = PtsUtil.transform_point_cloud(
view_pts, first_cam_to_real_world
)
_, splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
real_world_pts
)
view_pts_list.append(splitted_real_world_pts)
return view_pts_list
def register(self):
ControlUtil.connect_robot()
""" init robot """
Log.info("start init")
ControlUtil.init()
first_cam_to_real_world = ControlUtil.get_pose()
""" loop shooting """
Log.info("start loop shooting")
view_pts_list = self.loop_scan(first_cam_to_real_world)
""" register """
Log.info("start register")
pts = np.vstack(view_pts_list)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
if not os.path.exists(os.path.join(self.output_dir, self.object_name)):
os.makedirs(os.path.join(self.output_dir, self.object_name))
scene_dir = os.path.join(self.output_dir, self.object_name)
model_path = os.path.join(self.model_dir, self.object_name, "mesh.obj")
cad_model = trimesh.load(model_path)
real_world_to_cad = PtsUtil.register(pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world: trimesh.Trimesh = cad_model.apply_transform(
cad_to_real_world
)
cad_model_real_world.export(os.path.join(scene_dir, "mesh.obj"))
#downsampled_pts = PtsUtil.voxel_downsample_point_cloud(pts, self.voxel_size)
np.savetxt(os.path.join(scene_dir, "pts_for_init_reg.txt"), pts)
return cad_to_real_world
def render_data(self):
scene_dir = os.path.join(self.output_dir, self.object_name)
result = subprocess.run(
[
self.blender_bin_path,
"-b",
"-P",
self.generator_script_path,
"--",
scene_dir,
],
capture_output=True,
text=True,
)
print(result)
def preprocess_data(self):
save_scene_data(self.output_dir, self.object_name, file_type="npy")
def get_scan_points_indices(self, scan_points, mask, object_mask_label= (0, 255, 0, 255), cam_intrinsic = None, cam_extrinsic = None):
scan_points_homogeneous = np.hstack((scan_points, np.ones((scan_points.shape[0], 1))))
points_camera = np.dot(np.linalg.inv(cam_extrinsic), scan_points_homogeneous.T).T[:, :3]
points_image_homogeneous = np.dot(cam_intrinsic, points_camera.T).T
points_image_homogeneous /= points_image_homogeneous[:, 2:]
pixel_x = points_image_homogeneous[:, 0].astype(int)
pixel_y = points_image_homogeneous[:, 1].astype(int)
h, w = mask.shape[:2]
valid_indices = (pixel_x >= 0) & (pixel_x < w) & (pixel_y >= 0) & (pixel_y < h)
mask_colors = mask[pixel_y[valid_indices], pixel_x[valid_indices]]
selected_points_indices = np.where((mask_colors != object_mask_label).any(axis=-1))[0]
selected_points_indices = np.where(valid_indices)[0][selected_points_indices]
return selected_points_indices
def run_one_model(self, model_name):
scene_dir = os.path.join(self.output_dir, model_name)
ControlUtil.connect_robot()
""" init robot """
Log.info("start init")
ControlUtil.init()
first_cam_to_real_world = ControlUtil.get_pose()
""" loop shooting """
Log.info("start loop shooting")
view_pts_list = self.loop_scan(first_cam_to_real_world)
""" register """
cad_path = os.path.join(scene_dir, "mesh.obj")
cad_model = trimesh.load(cad_path)
Log.info("start register")
init_pts = np.vstack(view_pts_list)
real_world_to_cad = PtsUtil.register(init_pts, cad_model)
curr_cad_to_real_world = np.linalg.inv(real_world_to_cad)
# np.savetxt(os.path.join("/home/yan20/nbv_rec/project/franka_control/debug", "pts_for_init_reg.txt"), init_pts)
# debug_cad = cad_model.apply_transform(curr_cad_to_real_world)
# debug_cad.export(os.path.join("/home/yan20/nbv_rec/project/franka_control/debug", "cad_for_init_reg.obj"))
pts_dir = os.path.join(scene_dir, "pts")
sample_view_pts_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(scene_dir, "pts", f"{frame_idx}.npy")
point_cloud = np.load(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(
point_cloud, self.voxel_size
)
sample_view_pts_list.append(sampled_point_cloud)
else:
sample_view_pts_list.append(np.zeros((0, 3)))
""" close-loop online registery strategy """
scanned_pts = PtsUtil.voxel_downsample_point_cloud(init_pts, voxel_size=self.voxel_size)
shot_pts_list = []
last_coverage = 0
Log.info("start close-loop control")
cnt = 0
mask_list = []
cam_to_cad_list = []
cam_R_to_cad_list = []
shot_view_idx_list = []
scan_points_path = os.path.join(self.output_dir, self.object_name, "scan_points.txt")
display_table_scan_points = np.loadtxt(scan_points_path)
for i in range(frame_num):
path = DataLoadUtil.get_path(self.output_dir, self.object_name, i)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
mask_list.append((mask_L, mask_R))
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
cam_to_cad = cam_info["cam_to_world"]
cam_to_cad_list.append(cam_to_cad)
cam_R_to_cad = cam_info["cam_to_world_R"]
cam_R_to_cad_list.append(cam_R_to_cad)
selected_view = []
while True:
import ipdb; ipdb.set_trace()
history_indices = []
scan_points_idx_list = []
for i in range(frame_num):
cam_to_cad = cam_to_cad_list[i]
cam_R_to_cad = cam_R_to_cad_list[i]
curr_cam_L_to_world = curr_cad_to_real_world @ cam_to_cad
curr_cam_R_to_world = curr_cad_to_real_world @ cam_R_to_cad
scan_points_indices_L = self.get_scan_points_indices(display_table_scan_points, mask_list[i][0], cam_intrinsic=cam_info["cam_intrinsic"], cam_extrinsic=curr_cam_L_to_world)
scan_points_indices_R = self.get_scan_points_indices(display_table_scan_points, mask_list[i][1], cam_intrinsic=cam_info["cam_intrinsic"], cam_extrinsic=curr_cam_R_to_world)
scan_points_indices = np.intersect1d(scan_points_indices_L, scan_points_indices_R)
scan_points_idx_list.append(scan_points_indices)
for shot_view_idx in shot_view_idx_list:
history_indices.append(scan_points_idx_list[shot_view_idx])
cad_scanned_pts = PtsUtil.transform_point_cloud(scanned_pts, np.linalg.inv(curr_cad_to_real_world))
next_best_view, next_best_coverage, next_best_covered_num = (
ReconstructionUtil.compute_next_best_view_with_overlap(
cad_scanned_pts,
sample_view_pts_list,
selected_view,
history_indices,
scan_points_idx_list,
threshold=self.voxel_size,
overlap_area_threshold=25,
scan_points_threshold=self.scan_points_threshold,
)
)
if next_best_view is None:
Log.warning("No next best view found")
selected_view.append(next_best_view)
nbv_path = DataLoadUtil.get_path(self.output_dir, self.object_name, next_best_view)
nbv_cam_info = DataLoadUtil.load_cam_info(nbv_path, binocular=True)
nbv_cam_to_cad = nbv_cam_info["cam_to_world_O"]
nbv_cam_to_world = curr_cad_to_real_world @ nbv_cam_to_cad
target_camera_pose = nbv_cam_to_world @ ControlUtil.CAMERA_CORRECTION
ControlUtil.move_to(target_camera_pose)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("No view data received")
continue
shot_view_idx_list.append(next_best_view)
cam_shot_pts = ViewUtil.get_pts(view_data)
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
shot_pts_list.append(world_splitted_shot_pts)
debug_dir = os.path.join(scene_dir, "debug")
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
last_scanned_pts_num = scanned_pts.shape[0]
import ipdb;ipdb.set_trace()
new_scanned_pts = PtsUtil.voxel_downsample_point_cloud(
np.vstack([scanned_pts, world_splitted_shot_pts]), self.voxel_size
)
# last_real_world_to_cad = real_world_to_cad
# real_world_to_cad = PtsUtil.register_fine(new_scanned_pts, cad_model)
# # rot distance of two rotation matrix
# rot_dist = np.arccos(
# (np.trace(real_world_to_cad[:3, :3].T @ last_real_world_to_cad[:3, :3]) - 1) / 2
# )
# print(f"-----rot dist: {rot_dist}")
curr_cad_to_real_world = np.linalg.inv(real_world_to_cad)
cad_splitted_shot_pts = PtsUtil.transform_point_cloud(world_splitted_shot_pts, real_world_to_cad)
np.savetxt(os.path.join(debug_dir, f"shot_pts_{cnt}.txt"), world_splitted_shot_pts)
np.savetxt(os.path.join(debug_dir, f"render_pts_{cnt}.txt"), sample_view_pts_list[next_best_view])
np.savetxt(os.path.join(debug_dir, f"reg_scanned_pts_{cnt}.txt"), new_scanned_pts)
cad_pts = cad_model.vertices
world_cad_pts = PtsUtil.transform_point_cloud(cad_pts, curr_cad_to_real_world)
np.savetxt(os.path.join(debug_dir, f"world_cad_pts_{cnt}.txt"), world_cad_pts)
#import ipdb; ipdb.set_trace()
new_scanned_pts_num = new_scanned_pts.shape[0]
scanned_pts = new_scanned_pts
Log.info(
f"Next Best cover pts: {next_best_covered_num}, Best coverage: {next_best_coverage}"
)
coverage_rate_increase = next_best_coverage - last_coverage
if coverage_rate_increase < self.min_coverage_increase:
Log.info(f"Coverage rate = {coverage_rate_increase} < {self.min_coverage_increase}, stop scanning")
# break
last_coverage = next_best_coverage
new_added_pts_num = new_scanned_pts_num - last_scanned_pts_num
if new_added_pts_num < self.min_shot_new_pts_num:
Log.info(f"New added pts num = {new_added_pts_num} < {self.min_shot_new_pts_num}")
#ipdb.set_trace()
if len(shot_pts_list) >= self.max_shot_view_num:
Log.info(f"Scanned view num = {len(shot_pts_list)} >= {self.max_shot_view_num}, stop scanning")
#break
cnt += 1
Log.success("[Part 4/4] finish close-loop control")
def run(self):
self.run_one_model(self.object_name)
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"/home/yan20/nbv_rec/data/models/workpiece_1/mesh.obj"
model = trimesh.load(model_path)

View File

@@ -0,0 +1,347 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
from beans.pointcloud import Pointcloud,PointcloudGroup
@stereotype.runner("CAD_close_loop_online_reg_strategy_runner")
class CADCloseLoopOnlineRegStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_strategy")
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.output_dir = self.generate_config["output_dir"]
self.model_dir = self.generate_config["model_dir"]
self.object_name = self.generate_config["object_name"]
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_shot_view_num = self.reconstruct_config["max_shot_view_num"]
self.min_shot_new_pts_num = self.reconstruct_config["min_shot_new_pts_num"]
self.min_coverage_increase = self.reconstruct_config["min_coverage_increase"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def loop_scan(self, first_cam_to_real_world):
loop_scan_pcl_group = PointcloudGroup(name="loop_scan_pcl_group")
first_view_data = CommunicateUtil.get_view_data(init=True)
ControlUtil.absolute_rotate_display_table(90)
first_pts = ViewUtil.get_pts(first_view_data)
first_real_world_pts = PtsUtil.transform_point_cloud(
first_pts, first_cam_to_real_world
)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
first_real_world_pts
)
first_real_pcl = Pointcloud(first_splitted_real_world_pts, first_cam_to_real_world, name='first_real_world_pts')
loop_scan_pcl_group.add(first_real_pcl)
shot_num = 4
for i in range(shot_num-1):
view_data = CommunicateUtil.get_view_data()
if i != shot_num - 2:
ControlUtil.absolute_rotate_display_table(90)
time.sleep(0.5)
if view_data is None:
Log.error("No view data received")
continue
view_pts = ViewUtil.get_pts(view_data)
real_world_pts = PtsUtil.transform_point_cloud(
view_pts, first_cam_to_real_world
)
_, splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
real_world_pts
)
loop_scan_real_world_pcl = Pointcloud(splitted_real_world_pts, None, name=f'loop_scan_real_world_pcl_{(i+1)*90} degree')
loop_scan_pcl_group.add(loop_scan_real_world_pcl)
#loop_scan_pcl_group.visualize()
ControlUtil.absolute_rotate_display_table(90)
return loop_scan_pcl_group
def register(self):
ControlUtil.connect_robot()
""" init robot """
Log.info("start init")
ControlUtil.init()
first_cam_to_real_world = ControlUtil.get_pose()
""" loop shooting """
Log.info("start loop shooting")
loop_scan_pcl_group = self.loop_scan(first_cam_to_real_world)
""" register """
Log.info("start register")
merged_world_scan_loop_pcl = loop_scan_pcl_group.merge_pointclouds(name="merged_world_scan_loop_pts")
pts = merged_world_scan_loop_pcl.points
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
if not os.path.exists(os.path.join(self.output_dir, self.object_name)):
os.makedirs(os.path.join(self.output_dir, self.object_name))
scene_dir = os.path.join(self.output_dir, self.object_name)
model_path = os.path.join(self.model_dir, self.object_name, "mesh.obj")
cad_model = trimesh.load(model_path)
real_world_to_cad = PtsUtil.register(pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world: trimesh.Trimesh = cad_model.apply_transform(
cad_to_real_world
)
cad_model_real_world.export(os.path.join(scene_dir, "mesh.obj"))
#downsampled_pts = PtsUtil.voxel_downsample_point_cloud(pts, self.voxel_size)
np.savetxt(os.path.join(scene_dir, "pts_for_init_reg.txt"), pts)
return cad_to_real_world
def render_data(self):
scene_dir = os.path.join(self.output_dir, self.object_name)
result = subprocess.run(
[
self.blender_bin_path,
"-b",
"-P",
self.generator_script_path,
"--",
scene_dir,
],
capture_output=True,
text=True,
)
print(result)
def preprocess_data(self):
save_scene_data(self.output_dir, self.object_name, file_type="npy")
def get_scan_points_indices(self, scan_points, mask, object_mask_label= (0, 255, 0, 255), cam_intrinsic = None, cam_extrinsic = None):
scan_points_homogeneous = np.hstack((scan_points, np.ones((scan_points.shape[0], 1))))
points_camera = np.dot(np.linalg.inv(cam_extrinsic), scan_points_homogeneous.T).T[:, :3]
points_image_homogeneous = np.dot(cam_intrinsic, points_camera.T).T
points_image_homogeneous /= points_image_homogeneous[:, 2:]
pixel_x = points_image_homogeneous[:, 0].astype(int)
pixel_y = points_image_homogeneous[:, 1].astype(int)
h, w = mask.shape[:2]
valid_indices = (pixel_x >= 0) & (pixel_x < w) & (pixel_y >= 0) & (pixel_y < h)
mask_colors = mask[pixel_y[valid_indices], pixel_x[valid_indices]]
selected_points_indices = np.where((mask_colors != object_mask_label).any(axis=-1))[0]
selected_points_indices = np.where(valid_indices)[0][selected_points_indices]
return selected_points_indices
def run_one_model(self, model_name):
scene_dir = os.path.join(self.output_dir, model_name)
ControlUtil.connect_robot()
""" init robot """
Log.info("start init")
ControlUtil.init()
first_cam_to_real_world = ControlUtil.get_pose()
""" loop shooting """
Log.info("start loop shooting")
loop_scan_pcl_group = self.loop_scan(first_cam_to_real_world)
""" register """
cad_path = os.path.join(scene_dir, "mesh.obj")
cad_model = trimesh.load(cad_path)
Log.info("start register")
merged_world_scan_loop_pcl = loop_scan_pcl_group.merge_pointclouds(name="merged_world_scan_loop_pts")
real_world_to_cad = PtsUtil.register(merged_world_scan_loop_pcl.points, cad_model)
curr_cad_to_real_world = np.linalg.inv(real_world_to_cad)
# np.savetxt(os.path.join("/home/yan20/nbv_rec/project/franka_control/debug", "pts_for_init_reg.txt"), init_pts)
# debug_cad = cad_model.apply_transform(curr_cad_to_real_world)
# debug_cad.export(os.path.join("/home/yan20/nbv_rec/project/franka_control/debug", "cad_for_init_reg.obj"))
pts_dir = os.path.join(scene_dir, "pts")
sample_view_pts_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(scene_dir, "pts", f"{frame_idx}.npy")
point_cloud = np.load(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(
point_cloud, self.voxel_size
)
sample_view_pts_list.append(sampled_point_cloud)
else:
sample_view_pts_list.append(np.zeros((0, 3)))
""" close-loop online registery strategy """
scanned_pcl = merged_world_scan_loop_pcl.voxel_downsample(self.voxel_size)
scanned_pts = scanned_pcl.points
shot_pcl_group = PointcloudGroup(name="world_shot_pcl_group")
last_coverage = 0
Log.info("start close-loop control")
cnt = 0
mask_list = []
cam_to_cad_list = []
cam_R_to_cad_list = []
shot_view_idx_list = []
scan_points_path = os.path.join(self.output_dir, self.object_name, "scan_points.txt")
display_table_scan_points = np.loadtxt(scan_points_path)
for i in range(frame_num):
path = DataLoadUtil.get_path(self.output_dir, self.object_name, i)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
mask_list.append((mask_L, mask_R))
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
cam_to_cad = cam_info["cam_to_world"]
cam_to_cad_list.append(cam_to_cad)
cam_R_to_cad = cam_info["cam_to_world_R"]
cam_R_to_cad_list.append(cam_R_to_cad)
selected_view = []
while True:
#import ipdb; ipdb.set_trace()
history_indices = []
scan_points_idx_list = []
for i in range(frame_num):
cam_to_cad = cam_to_cad_list[i]
cam_R_to_cad = cam_R_to_cad_list[i]
curr_cam_L_to_world = curr_cad_to_real_world @ cam_to_cad
curr_cam_R_to_world = curr_cad_to_real_world @ cam_R_to_cad
scan_points_indices_L = self.get_scan_points_indices(display_table_scan_points, mask_list[i][0], cam_intrinsic=cam_info["cam_intrinsic"], cam_extrinsic=curr_cam_L_to_world)
scan_points_indices_R = self.get_scan_points_indices(display_table_scan_points, mask_list[i][1], cam_intrinsic=cam_info["cam_intrinsic"], cam_extrinsic=curr_cam_R_to_world)
scan_points_indices = np.intersect1d(scan_points_indices_L, scan_points_indices_R)
scan_points_idx_list.append(scan_points_indices)
for shot_view_idx in shot_view_idx_list:
history_indices.append(scan_points_idx_list[shot_view_idx])
cad_scanned_pts = PtsUtil.transform_point_cloud(scanned_pts, np.linalg.inv(curr_cad_to_real_world))
next_best_view, next_best_coverage, next_best_covered_num = (
ReconstructionUtil.compute_next_best_view_with_overlap(
cad_scanned_pts,
sample_view_pts_list,
selected_view,
history_indices,
scan_points_idx_list,
threshold=self.voxel_size,
overlap_area_threshold=10,
scan_points_threshold=self.scan_points_threshold,
)
)
if next_best_view is None:
Log.warning("No next best view found")
selected_view.append(next_best_view)
nbv_path = DataLoadUtil.get_path(self.output_dir, self.object_name, next_best_view)
nbv_cam_info = DataLoadUtil.load_cam_info(nbv_path, binocular=True)
nbv_cam_to_cad = nbv_cam_info["cam_to_world_O"]
nbv_cam_to_world = curr_cad_to_real_world @ nbv_cam_to_cad
target_camera_pose = nbv_cam_to_world @ ControlUtil.CAMERA_CORRECTION
ControlUtil.move_to(target_camera_pose)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("No view data received")
continue
shot_view_idx_list.append(next_best_view)
cam_shot_pts = ViewUtil.get_pts(view_data)
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
world_shot_pcl = Pointcloud(world_splitted_shot_pts, nbv_cam_to_world, name=f"world_shot_pts_{cnt}")
cad_render_pcl = Pointcloud(sample_view_pts_list[next_best_view], nbv_cam_to_world, name=f"cad_render_pts_{cnt}")
world_render_pcl = cad_render_pcl.transform(curr_cad_to_real_world)
shot_pcl_group.add(world_shot_pcl)
render_shot_group = PointcloudGroup(pointclouds=[world_render_pcl, world_shot_pcl],name="render_shot_group")
import ipdb; ipdb.set_trace()
render_shot_group.visualize()
debug_dir = os.path.join(scene_dir, "debug")
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
last_scanned_pts_num = scanned_pts.shape[0]
new_scanned_pts = PtsUtil.voxel_downsample_point_cloud(
np.vstack([scanned_pts, world_splitted_shot_pts]), self.voxel_size
)
# last_real_world_to_cad = real_world_to_cad
# real_world_to_cad = PtsUtil.register_fine(new_scanned_pts, cad_model)
# # rot distance of two rotation matrix
# rot_dist = np.arccos(
# (np.trace(real_world_to_cad[:3, :3].T @ last_real_world_to_cad[:3, :3]) - 1) / 2
# )
# print(f"-----rot dist: {rot_dist}")
curr_cad_to_real_world = np.linalg.inv(real_world_to_cad)
cad_splitted_shot_pts = PtsUtil.transform_point_cloud(world_splitted_shot_pts, real_world_to_cad)
np.savetxt(os.path.join(debug_dir, f"shot_pts_{cnt}.txt"), cad_splitted_shot_pts)
np.savetxt(os.path.join(debug_dir, f"render_pts_{cnt}.txt"), sample_view_pts_list[next_best_view])
np.savetxt(os.path.join(debug_dir, f"reg_scanned_pts_{cnt}.txt"), new_scanned_pts)
cad_pts = cad_model.vertices
world_cad_pts = PtsUtil.transform_point_cloud(cad_pts, curr_cad_to_real_world)
np.savetxt(os.path.join(debug_dir, f"world_cad_pts_{cnt}.txt"), world_cad_pts)
#import ipdb; ipdb.set_trace()
new_scanned_pts_num = new_scanned_pts.shape[0]
scanned_pts = new_scanned_pts
Log.info(
f"Next Best cover pts: {next_best_covered_num}, Best coverage: {next_best_coverage}"
)
coverage_rate_increase = next_best_coverage - last_coverage
if coverage_rate_increase < self.min_coverage_increase:
Log.info(f"Coverage rate = {coverage_rate_increase} < {self.min_coverage_increase}, stop scanning")
# break
last_coverage = next_best_coverage
new_added_pts_num = new_scanned_pts_num - last_scanned_pts_num
if new_added_pts_num < self.min_shot_new_pts_num:
Log.info(f"New added pts num = {new_added_pts_num} < {self.min_shot_new_pts_num}")
#ipdb.set_trace()
if len(shot_pcl_group) >= self.max_shot_view_num:
Log.info(f"Scanned view num = {len(shot_pcl_group)} >= {self.max_shot_view_num}, stop scanning")
#break
cnt += 1
Log.success("[Part 4/4] finish close-loop control")
def run(self):
self.run_one_model(self.object_name)
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"/home/yan20/nbv_rec/data/models/workpiece_1/mesh.obj"
model = trimesh.load(model_path)

View File

@@ -0,0 +1,242 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
@stereotype.runner("CAD_close_loop_strategy_runner")
class CADCloseLoopStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_strategy")
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.model_dir = self.generate_config["model_dir"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_shot_view_num = self.reconstruct_config["max_shot_view_num"]
self.min_shot_new_pts_num = self.reconstruct_config["min_shot_new_pts_num"]
self.min_coverage_increase = self.reconstruct_config["min_coverage_increase"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def run_one_model(self, model_name):
time_code = time.strftime("%Y%m%d%H%M%S", time.localtime())
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output_" + time_code
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
ControlUtil.connect_robot()
""" init robot """
Log.info("[Part 1/5] start init and register")
ControlUtil.init()
""" load CAD model """
model_path = os.path.join(self.model_dir, model_name, "mesh.obj")
temp_name = "cad_model_world"
cad_model = trimesh.load(model_path)
""" take first view """
Log.info("[Part 1/5] take first view data")
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_pts = ViewUtil.get_pts(view_data)
first_cam_to_real_world = ControlUtil.get_pose()
first_real_world_pts = PtsUtil.transform_point_cloud(
first_cam_pts, first_cam_to_real_world
)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
first_real_world_pts
)
debug_dir = os.path.join(temp_dir, "debug")
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
np.savetxt(os.path.join(debug_dir,"FIRST_shot.txt"), first_splitted_real_world_pts)
# np.savetxt(f"first_real_pts_{model_name}.txt", first_splitted_real_world_pts)
""" register """
Log.info("[Part 1/4] do registeration")
real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("[Part 1/4] finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world: trimesh.Trimesh = cad_model.apply_transform(
cad_to_real_world
)
cad_model_real_world.export(
os.path.join(temp_dir, f"real_world_{temp_name}.obj")
)
cad_model_blender_world: trimesh.Trimesh = cad_model.apply_transform(
real_world_to_blender_world
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output_" + time_code
cad_model_blender_world.export(os.path.join(temp_dir, f"{temp_name}.obj"))
""" sample view """
Log.info("[Part 2/4] start running renderer")
subprocess.run(
[
self.blender_bin_path,
"-b",
"-P",
self.generator_script_path,
"--",
temp_dir,
],
capture_output=True,
text=True,
)
Log.success("[Part 2/4] finish running renderer")
""" preprocess """
Log.info("[Part 3/4] start preprocessing data")
save_scene_data(temp_dir, temp_name)
Log.success("[Part 3/4] finish preprocessing data")
pts_dir = os.path.join(temp_dir, temp_name, "pts")
sample_view_pts_list = []
scan_points_idx_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(temp_dir, temp_name, "pts", f"{frame_idx}.txt")
idx_path = os.path.join(
temp_dir, temp_name, "scan_points_indices", f"{frame_idx}.npy"
)
point_cloud = np.loadtxt(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(
point_cloud, self.voxel_size
)
indices = np.load(idx_path)
try:
len(indices)
except:
indices = np.array([indices])
sample_view_pts_list.append(sampled_point_cloud)
scan_points_idx_list.append(indices)
""" close-loop strategy """
scanned_pts = PtsUtil.voxel_downsample_point_cloud(
first_splitted_real_world_pts, self.voxel_size
)
shot_pts_list = [first_splitted_real_world_pts]
history_indices = []
last_coverage = 0
Log.info("[Part 4/4] start close-loop control")
cnt = 0
selected_view = []
while True:
#import ipdb; ipdb.set_trace()
next_best_view, next_best_coverage, next_best_covered_num = (
ReconstructionUtil.compute_next_best_view_with_overlap(
scanned_pts,
sample_view_pts_list,
selected_view,
history_indices,
scan_points_idx_list,
threshold=self.voxel_size,
overlap_area_threshold=50,
scan_points_threshold=self.scan_points_threshold,
)
)
selected_view.append(next_best_view)
nbv_path = DataLoadUtil.get_path(temp_dir, temp_name, next_best_view)
nbv_cam_info = DataLoadUtil.load_cam_info(nbv_path, binocular=True)
nbv_cam_to_world = nbv_cam_info["cam_to_world_O"]
ControlUtil.move_to(nbv_cam_to_world)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("No view data received")
continue
cam_shot_pts = ViewUtil.get_pts(view_data)
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
shot_pts_list.append(world_splitted_shot_pts)
np.savetxt(os.path.join(debug_dir, f"shot_pts_{cnt}.txt"), world_splitted_shot_pts)
np.savetxt(os.path.join(debug_dir, f"render_pts_{cnt}.txt"), sample_view_pts_list[next_best_view])
#real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
#import ipdb; ipdb.set_trace()
last_scanned_pts_num = scanned_pts.shape[0]
new_scanned_pts = PtsUtil.voxel_downsample_point_cloud(
np.vstack([scanned_pts, world_splitted_shot_pts]), self.voxel_size
)
new_scanned_pts_num = new_scanned_pts.shape[0]
history_indices.append(scan_points_idx_list[next_best_view])
scanned_pts = new_scanned_pts
Log.info(
f"Next Best cover pts: {next_best_covered_num}, Best coverage: {next_best_coverage}"
)
coverage_rate_increase = next_best_coverage - last_coverage
if coverage_rate_increase < self.min_coverage_increase:
Log.info(f"Coverage rate = {coverage_rate_increase} < {self.min_coverage_increase}, stop scanning")
# break
last_coverage = next_best_coverage
new_added_pts_num = new_scanned_pts_num - last_scanned_pts_num
if new_added_pts_num < self.min_shot_new_pts_num:
Log.info(f"New added pts num = {new_added_pts_num} < {self.min_shot_new_pts_num}")
#ipdb.set_trace()
if len(shot_pts_list) >= self.max_shot_view_num:
Log.info(f"Scanned view num = {len(shot_pts_list)} >= {self.max_shot_view_num}, stop scanning")
#break
cnt += 1
Log.success("[Part 4/4] finish close-loop control")
def run(self):
total = len(os.listdir(self.model_dir))
model_start_idx = self.generate_config["model_start_idx"]
count_object = model_start_idx
# for model_name in os.listdir(self.model_dir[model_start_idx:]):
# Log.info(f"[{count_object}/{total}]Processing {model_name}")
# self.run_one_model(model_name)
# Log.success(f"[{count_object}/{total}]Finished processing {model_name}")
self.run_one_model("box_1")
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"C:\Users\hofee\Downloads\mesh.obj"
model = trimesh.load(model_path)

View File

@@ -0,0 +1,224 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
@stereotype.runner("CAD_open_loop_strategy_runner")
class CADOpenLoopStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_open_loop_strategy")
self.status_info = {
"status_manager": status_manager,
"app_name": "cad",
"runner_name": "CAD_open_loop_strategy_runner"
}
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.model_dir = self.generate_config["model_dir"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_view = self.generate_config["max_view"]
self.min_view = self.generate_config["min_view"]
self.max_diag = self.generate_config["max_diag"]
self.min_diag = self.generate_config["min_diag"]
self.min_cam_table_included_degree = self.generate_config["min_cam_table_included_degree"]
self.random_view_ratio = self.generate_config["random_view_ratio"]
self.soft_overlap_threshold = self.reconstruct_config["soft_overlap_threshold"]
self.hard_overlap_threshold = self.reconstruct_config["hard_overlap_threshold"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold = 0):
scan_pts = world_pts[world_pts[:,2] < z_threshold]
obj_pts = world_pts[world_pts[:,2] >= z_threshold]
return scan_pts, obj_pts
def run_one_model(self, model_name):
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
result = dict()
shot_pts_list = []
ControlUtil.connect_robot()
''' init robot '''
Log.info("[Part 1/5] start init and register")
ControlUtil.init()
''' load CAD model '''
model_path = os.path.join(self.model_dir, model_name,"mesh.ply")
temp_name = "cad_model_world"
cad_model = trimesh.load(model_path)
''' take first view '''
Log.info("[Part 1/5] take first view data")
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_pts = ViewUtil.get_pts(view_data)
first_cam_to_real_world = ControlUtil.get_pose()
first_real_world_pts = PtsUtil.transform_point_cloud(first_cam_pts, first_cam_to_real_world)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(first_real_world_pts)
np.savetxt(f"first_real_pts_{model_name}.txt", first_splitted_real_world_pts)
''' register '''
Log.info("[Part 1/5] do registeration")
real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("[Part 1/5] finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world:trimesh.Trimesh = cad_model.apply_transform(cad_to_real_world)
cad_model_real_world.export(os.path.join(temp_dir, f"real_world_{temp_name}.obj"))
cad_model_blender_world:trimesh.Trimesh = cad_model.apply_transform(real_world_to_blender_world)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
cad_model_blender_world.export(os.path.join(temp_dir, f"{temp_name}.obj"))
scene_dir = os.path.join(temp_dir, temp_name)
''' sample view '''
Log.info("[Part 2/5] start running renderer")
subprocess.run([
self.blender_bin_path, '-b', '-P', self.generator_script_path, '--', temp_dir
], capture_output=True, text=True)
Log.success("[Part 2/5] finish running renderer")
world_model_points = np.loadtxt(os.path.join(scene_dir, "points_and_normals.txt"))[:,:3]
''' preprocess '''
Log.info("[Part 3/5] start preprocessing data")
save_scene_data(temp_dir, temp_name)
Log.success("[Part 3/5] finish preprocessing data")
pts_dir = os.path.join(temp_dir,temp_name,"pts")
sample_view_pts_list = []
scan_points_idx_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(temp_dir,temp_name, "pts", f"{frame_idx}.txt")
idx_path = os.path.join(temp_dir,temp_name, "scan_points_indices", f"{frame_idx}.npy")
point_cloud = np.loadtxt(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, self.voxel_size)
indices = np.load(idx_path)
try:
len(indices)
except:
indices = np.array([indices])
sample_view_pts_list.append(sampled_point_cloud)
scan_points_idx_list.append(indices)
''' generate strategy '''
Log.info("[Part 4/5] start generating strategy")
limited_useful_view, _, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(
world_model_points, sample_view_pts_list,
scan_points_indices_list = scan_points_idx_list,
init_view=0,
threshold=self.voxel_size,
soft_overlap_threshold = self.soft_overlap_threshold,
hard_overlap_threshold = self.hard_overlap_threshold,
scan_points_threshold = self.scan_points_threshold,
status_info=self.status_info
)
Log.success("[Part 4/5] finish generating strategy")
''' extract cam_to_world sequence '''
cam_to_world_seq = []
coveraget_rate_seq = []
render_pts = []
idx_seq = []
for idx, coverage_rate in limited_useful_view:
path = DataLoadUtil.get_path(temp_dir, temp_name, idx)
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
cam_to_world_seq.append(cam_info["cam_to_world_O"])
coveraget_rate_seq.append(coverage_rate)
idx_seq.append(idx)
render_pts.append(sample_view_pts_list[idx])
Log.info("[Part 5/5] start running robot")
''' take best seq view '''
#import ipdb; ipdb.set_trace()
target_scanned_pts = np.concatenate(sample_view_pts_list)
voxel_downsampled_target_scanned_pts = PtsUtil.voxel_downsample_point_cloud(target_scanned_pts, self.voxel_size)
result = dict()
gt_scanned_pts = np.concatenate(render_pts, axis=0)
voxel_down_sampled_gt_scanned_pts = PtsUtil.voxel_downsample_point_cloud(gt_scanned_pts, self.voxel_size)
result["gt_final_coverage_rate_cad"] = ReconstructionUtil.compute_coverage_rate(voxel_downsampled_target_scanned_pts, voxel_down_sampled_gt_scanned_pts, self.voxel_size)
step = 1
result["real_coverage_rate_seq"] = []
for cam_to_world in cam_to_world_seq:
try:
ControlUtil.move_to(cam_to_world)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("Failed to get view data")
continue
cam_pts = ViewUtil.get_pts(view_data)
shot_pts_list.append(cam_pts)
scanned_pts = np.concatenate(shot_pts_list, axis=0)
voxel_down_sampled_scanned_pts = PtsUtil.voxel_downsample_point_cloud(scanned_pts, self.voxel_size)
voxel_down_sampled_scanned_pts_world = PtsUtil.transform_point_cloud(voxel_down_sampled_scanned_pts, first_cam_to_real_world)
curr_CR = ReconstructionUtil.compute_coverage_rate(voxel_downsampled_target_scanned_pts, voxel_down_sampled_scanned_pts_world, self.voxel_size)
Log.success(f"(step {step}/{len(cam_to_world_seq)}) current coverage: {curr_CR} | gt coverage: {result['gt_final_coverage_rate_cad']}")
result["real_final_coverage_rate"] = curr_CR
result["real_coverage_rate_seq"].append(curr_CR)
step += 1
except Exception as e:
Log.error(f"Failed to move to {cam_to_world}")
Log.error(e)
#import ipdb;ipdb.set_trace()
for idx in range(len(shot_pts_list)):
if not os.path.exists(os.path.join(temp_dir, temp_name, "shot_pts")):
os.makedirs(os.path.join(temp_dir, temp_name, "shot_pts"))
if not os.path.exists(os.path.join(temp_dir, temp_name, "render_pts")):
os.makedirs(os.path.join(temp_dir, temp_name, "render_pts"))
shot_pts = PtsUtil.transform_point_cloud(shot_pts_list[idx], first_cam_to_real_world)
np.savetxt(os.path.join(temp_dir, temp_name, "shot_pts", f"{idx}.txt"), shot_pts)
np.savetxt(os.path.join(temp_dir, temp_name, "render_pts", f"{idx}.txt"), render_pts[idx])
Log.success("[Part 5/5] finish running robot")
Log.debug(result)
def run(self):
total = len(os.listdir(self.model_dir))
model_start_idx = self.generate_config["model_start_idx"]
count_object = model_start_idx
for model_name in os.listdir(self.model_dir[model_start_idx:]):
Log.info(f"[{count_object}/{total}]Processing {model_name}")
self.run_one_model(model_name)
Log.success(f"[{count_object}/{total}]Finished processing {model_name}")
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"C:\Users\hofee\Downloads\mesh.obj"
model = trimesh.load(model_path)

475
runners/calibration.py Normal file
View File

@@ -0,0 +1,475 @@
import cv2
import numpy as np
import open3d as o3d
import sys, os
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
import pickle
from tqdm import tqdm
import json
import pyrealsense2 as rs
# marker parameters
MARKER_TYPE = "aruco"
MARKER_INFO = {"id": 582, "size": 0.1}
# view parameters
VIEW_NUM = 6
LOOK_AT_PT = np.array([0.5, 0, 0.2])
TOP_DOWN_DIST = [0.3, 0.45, 0.55]
TOP_DOWN_DIRECTION = np.array([0., 0., -1.])
RADIUS = [0.25, 0.2, 0.1]
INIT_GRIPPER_POSE = np.array([
[1, 0, 0, 0.56],
[0, -1, 0, 0],
[0, 0, -1, 0.6],
[0, 0, 0, 1]
])
DEFAULT_EE_TO_BASE = np.array([
[0, -1, 0, 0],
[1, 0, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
class HandEyeCalibration:
"""
A class to perform hand-eye calibration for a robotic system using various camera types and markers.
Attributes:
marker_to_cam (list): List to store marker to camera transformation matrices.
end_effector_to_base (list): List to store end-effector to base transformation matrices.
marker_type (str): Type of marker used for calibration (default is "aruco").
marker_info (dict): Information about the marker, including its ID and size.
camera (object): Camera object used for capturing images.
intrinsics (dict): Intrinsic parameters of the camera.
dist (optional): Distortion coefficients of the camera.
marker_solver (object): Solver object for detecting and solving marker poses.
camera_type (str): Type of camera used (default is "RealSense"). "RealSense" and "Inspire" are supported.
Methods:
get_marker_to_cam_pose(color_image, visualize=False):
Gets the pose of the marker relative to the camera from a color image.
capture_single_frame():
Captures a single frame from the camera and gets the current end-effector pose.
capture_frames(view_num, look_at_pt, top_down_dist, top_down_direrction, radius, visualize=False):
Captures multiple frames by moving the end-effector to different poses and records the marker and end-effector poses.
get_hand_eye(marker_to_cam_poses, end_effector_to_base_poses):
Computes the hand-eye calibration matrix using the captured marker and end-effector poses.
calibrate(view_num, look_at_pt, top_down_dist, top_down_direrction, radius):
Performs the full calibration process by capturing frames and computing the hand-eye calibration matrix.
"""
def __init__(self, dist=None, marker_type="aruco", marker_info={"id": 0, "size": 0.1}, camera_type="RealSense"):
self.marker_to_cam = []
self.end_effector_to_base = []
self.marker_type = marker_type
self.marker_info = marker_info
ControlUtil.connect_robot()
ControlUtil.franka_reset()
if camera_type == "RealSense":
self.camera = RealSenseCamera()
elif camera_type == "Inspire":
self.camera = InspireCamera()
else:
assert False, "Not implemented yet"
self.intrinsics = self.camera.get_color_intrinsics()
self.dist = dist
self.marker_type = marker_type
if self.marker_type == "aruco":
self.marker_solver = ArcuoMarkerSolver(self.intrinsics, dist, marker_info)
self.camera_type = camera_type
def get_marker_to_cam_pose(self, color_image, visualize=False):
res = self.marker_solver.get_marker_to_cam_pose(color_image, visualize)
return res
def capture_single_frame(self):
color_image = self.camera.get_color_image()
end_effector_pose = ControlUtil.get_curr_gripper_to_base_pose()
return color_image, end_effector_pose
def capture_frames(self, view_num, look_at_pt, top_down_dist, top_down_direrction, radius, visualize=False):
circle_num = len(radius)
poses = []
for i in range(circle_num):
reverse_order = i % 2 == 1
pose_generator = EndEffectorPoseGenerator(view_num, look_at_pt, top_down_dist[i], top_down_direrction, radius[i], reverse_order)
poses += [pose @ DEFAULT_EE_TO_BASE for pose in pose_generator.poses]
if visualize == True:
pose_visualizer = PoseVisualizer(poses)
pose_visualizer.visualize()
pbar = tqdm(total=len(poses), desc="Capturing frames")
for pose in poses:
pbar.update(1)
ControlUtil.set_gripper_pose(pose)
color_image, end_effector_pose = self.capture_single_frame()
if color_image is None:
print("Failed to capture the frame")
continue
res = self.get_marker_to_cam_pose(color_image, visualize=True)
if res is None:
print("Failed to get marker pose")
continue
else:
marker_to_cam_pose = np.eye(4)
rvec = res["rvec"]
tvec = res["tvec"]
rot_mat = cv2.Rodrigues(rvec)[0]
marker_to_cam_pose[:3, :3] = rot_mat
marker_to_cam_pose[:3, 3] = tvec
self.marker_to_cam.append(marker_to_cam_pose)
self.end_effector_to_base.append(end_effector_pose)
def get_hand_eye(self, marker_to_cam_poses, end_effector_to_base_poses):
############################## Debug ################################
# with open("marker_to_cam_poses.pkl", "wb") as f:
# pickle.dump(marker_to_cam_poses, f)
# with open("end_effector_to_base_poses.pkl", "wb") as f:
# pickle.dump(end_effector_to_base_poses, f)
# with open("marker_to_cam_poses.pkl", "rb") as f:
# marker_to_cam_poses = pickle.load(f)
# with open("end_effector_to_base_poses.pkl", "rb") as f:
# end_effector_to_base_poses = pickle.load(f)
#####################################################################
R_marker_to_cam = []
t_marker_to_cam = []
R_end_effector_to_base = []
t_end_effector_to_base = []
for index, marker_to_cam_pose in enumerate(marker_to_cam_poses):
if marker_to_cam_pose is None:
continue
R_marker_to_cam.append(marker_to_cam_pose[:3, :3])
t_marker_to_cam.append(marker_to_cam_pose[:3, 3])
R_end_effector_to_base.append(end_effector_to_base_poses[index][:3, :3])
t_end_effector_to_base.append(end_effector_to_base_poses[index][:3, 3])
if len(R_marker_to_cam) < 3:
print("Not enough data to calibrate")
return None
R_marker_to_cam = np.array(R_marker_to_cam)
t_marker_to_cam = np.array(t_marker_to_cam)
R_end_effector_to_base = np.array(R_end_effector_to_base)
t_end_effector_to_base = np.array(t_end_effector_to_base)
from ipdb import set_trace; set_trace()
hand_eye = cv2.calibrateHandEye(R_end_effector_to_base, t_end_effector_to_base, R_marker_to_cam, t_marker_to_cam, cv2.CALIB_HAND_EYE_TSAI)
cv2.destroyAllWindows()
return hand_eye
def calibrate(self, view_num, look_at_pt, top_down_dist, top_down_direrction, radius):
self.capture_frames(view_num, look_at_pt, top_down_dist, top_down_direrction, radius)
marker_to_cam = np.array(self.marker_to_cam)
end_effector_to_base = np.array(self.end_effector_to_base)
hand_eye = self.get_hand_eye(marker_to_cam, end_effector_to_base)
# hand_eye = self.get_hand_eye(None, None)
return hand_eye
class ArcuoMarkerSolver:
def __init__(self, intrinsics, dist=None, marker_info={"id": 0, "size": 0.1}):
self.intrinsics = intrinsics
self.dist = dist
self.marker_info = marker_info
def solve_arcuo_marker_pose(self, color_image):
self.res = []
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_ARUCO_ORIGINAL)
aruco_params = cv2.aruco.DetectorParameters()
cv2.imshow("color_image", color_image)
cv2.waitKey(500)
corners, ids, rejectedImgPoints = cv2.aruco.detectMarkers(color_image, aruco_dict, parameters=aruco_params)
if ids is None:
return None
for i in range(len(ids)):
rvec, tvec, _ = cv2.aruco.estimatePoseSingleMarkers(corners[i], self.marker_info["size"], self.intrinsics, self.dist)
self.res.append({
'rvec': rvec,
'tvec': tvec,
'corners': corners[i],
'ids': ids[i]
})
for res in self.res:
if res["ids"][0] == self.marker_info["id"]:
return res
return None
def get_marker_to_cam_pose(self, color_image, visualize=False):
res = self.solve_arcuo_marker_pose(color_image)
if visualize and res is not None:
self.visualize_res(color_image, res)
return res
def visualize_res(self, color_image, res):
rvec = res["rvec"]
tvec = res["tvec"]
corners = res["corners"]
ids = res["ids"]
aruco_frame = cv2.drawFrameAxes(color_image, self.intrinsics, self.dist, rvec, tvec, 0.05)
aruco_frame = cv2.aruco.drawDetectedMarkers(aruco_frame, (corners, ), ids)
cv2.imshow("aruco_frame", aruco_frame)
cv2.waitKey(500)
class EndEffectorPoseGenerator:
def __init__(self, view_num, look_at_pt, top_down_dist, top_down_direrction, radius, reverse_order=False):
self.view_num = view_num
self.look_at_pt = look_at_pt
self.top_down_dist = top_down_dist
self.top_down_direrction = top_down_direrction
self.radius = radius
self.poses = self.generate_poses(reverse_order)
def generate_poses(self, reverse_order=False):
poses = []
for i in range(self.view_num):
order = i if not reverse_order else self.view_num - i - 1
pose = self.get_pose(order)
poses.append(pose)
return poses
def get_pose(self, i):
angle = 0.8 * np.pi * i / self.view_num + 0.7 * np.pi
start_point_x = self.look_at_pt[0] + self.radius * np.cos(angle)
start_point_y = self.look_at_pt[1] + self.radius * np.sin(angle)
start_point_z = self.look_at_pt[2] + self.top_down_dist
look_at_vector = self.look_at_pt - np.array([start_point_x, start_point_y, start_point_z])
look_at_vector = look_at_vector / np.linalg.norm(look_at_vector)
up_vector = self.top_down_direrction
noise = np.random.uniform(0, 0.1, up_vector.shape)
right_vector = np.cross(look_at_vector, up_vector + noise)
right_vector = right_vector / np.linalg.norm(right_vector)
up_vector = np.cross(look_at_vector, right_vector)
up_vector = up_vector / np.linalg.norm(up_vector)
pose = np.eye(4)
pose[:3, :3] = self.get_rotation(right_vector, up_vector, look_at_vector)
pose[:3, 3] = np.array([start_point_x, start_point_y, start_point_z])
return pose
def get_rotation(self, right_vector, up_vector, look_at_vector):
rotation = np.eye(3)
rotation[:, 0] = right_vector
rotation[:, 1] = up_vector
rotation[:, 2] = look_at_vector
return rotation
class PoseVisualizer:
def __init__(self, poses):
self.poses = poses
def visualize(self):
vis = o3d.visualization.Visualizer()
vis.create_window()
for pose in self.poses:
mesh_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.05)
mesh_frame.transform(pose)
vis.add_geometry(mesh_frame)
print("Press q to close the window")
vis.run()
class CAMERA:
def __init__(self):
self.color_intrinsics = None
self.depth_intrinsics = None
self.color = None
self.depth = None
def get_color_intrinsics(self):
pass
def get_depth_intrinsics(self):
pass
def get_color_image(self):
pass
def get_depth_image(self):
pass
class RealSenseCamera(CAMERA):
def __init__(self):
super().__init__()
self.pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
align = rs.align(rs.stream.color)
self.pipeline.start(config)
self.color_intrinsics = self.get_intrinsics("color")
self.depth_intrinsics = self.get_intrinsics("depth")
def get_color_intrinsics(self):
return self.color_intrinsics
def get_depth_intrinsics(self):
return self.depth_intrinsics
def get_color_image(self):
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
color_image = np.asanyarray(color_frame.get_data())
return color_image
def get_depth_image(self):
frames = self.pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
depth_image = np.asanyarray(depth_frame.get_data())
return depth_image
def get_intrinsics(self, camra_type):
if camra_type == "color":
profile = self.pipeline.get_active_profile()
color_profile = rs.video_stream_profile(profile.get_stream(rs.stream.color))
color_intrinsics = color_profile.get_intrinsics()
K = [[color_intrinsics.fx, 0, color_intrinsics.ppx], [0, color_intrinsics.fy, color_intrinsics.ppy], [0, 0, 1]]
return np.array(K)
elif camra_type == "depth":
profile = self.pipeline.get_active_profile()
depth_profile = rs.video_stream_profile(profile.get_stream(rs.stream.depth))
depth_intrinsics = depth_profile.get_intrinsics()
K = [[depth_intrinsics.fx, 0, depth_intrinsics.ppx], [0, depth_intrinsics.fy, depth_intrinsics.ppy], [0, 0, 1]]
return np.array(K)
class InspireCamera(CAMERA):
def __init__(self):
super().__init__()
ControlUtil.set_gripper_pose(INIT_GRIPPER_POSE)
view_data = CommunicateUtil.get_view_data(init=True)
self.color_intrinsics = self.get_intrinsics(view_data["color_intrinsics"], view_data["color_image"])
self.depth_intrinsics = self.get_intrinsics(view_data["depth_intrinsics"], view_data["depth_image"])
def get_color_intrinsics(self):
return self.color_intrinsics
def get_depth_intrinsics(self):
return self.depth_intrinsics
def get_color_image(self):
view_data = CommunicateUtil.get_view_data()
if view_data is None:
return None
else:
color_image = view_data["color_image"]
return color_image
def get_depth_image(self):
view_data = CommunicateUtil.get_view_data()
if view_data is None:
return None
else:
depth_image = view_data["depth_image"]
return depth_image
def get_intrinsics(self, intrinsics, image):
height = image.shape[0]
width = image.shape[1]
scale_h = height / intrinsics["height"]
scale_w = width / intrinsics["width"]
fx = intrinsics["fx"] * scale_w
fy = intrinsics["fy"] * scale_h
cx = intrinsics["cx"] * scale_w
cy = intrinsics["cy"] * scale_h
K = np.array([
[fx, 0, cx],
[ 0, fy, cy],
[ 0, 0, 1]
])
return K
def example_view_generator():
pose_generator = EndEffectorPoseGenerator(VIEW_NUM, LOOK_AT_PT, TOP_DOWN_DIST, TOP_DOWN_DIRECTION, RADIUS)
base_pose = np.eye(4)
test_pose = np.array([
[1, 0, 0, 0],
[0, -1, 0, 0],
[0, 0, -1, 0.2],
[0, 0, 0, 1]
])
defalut_ee2base = np.array([
[0, -1, 0, 0],
[1, 0, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
poses = [pose @ defalut_ee2base for pose in pose_generator.poses] + [base_pose, test_pose]
visualizer = PoseVisualizer(poses)
visualizer.visualize()
def example_calibration():
# ControlUtil.connect_robot()
# ControlUtil.franka_reset()
# ControlUtil.set_gripper_pose(INIT_GRIPPER_POSE)
# view_data = CommunicateUtil.get_view_data(init=True)
# intrinsics = get_intrinsics(view_data["color_intrinsics"], view_data["color_image"])
calibration = HandEyeCalibration(
# intrinsics=intrinsics,
dist=None,
marker_type=MARKER_TYPE,
marker_info=MARKER_INFO,
camera_type="Inspire"
)
hand_eye = calibration.calibrate(
view_num=VIEW_NUM,
look_at_pt=LOOK_AT_PT,
top_down_dist=TOP_DOWN_DIST,
top_down_direrction=TOP_DOWN_DIRECTION,
radius=RADIUS
)
print(hand_eye)
res = {
'rotation': hand_eye[0].tolist(),
'translation': hand_eye[1].tolist()
}
with open("hand_eye.json", "w") as f:
json.dump(res, f, indent=4)
print("Hand eye calibration finished")
return res
def get_depth_to_end_effector_pose(calibration_res_path):
with open(calibration_res_path, "r") as f:
data = json.load(f)
rotation = np.array(data["rotation"])
translation = np.array(data["translation"])
color_to_ee = np.eye(4)
color_to_ee[:3, :3] = rotation
color_to_ee[:3, 3:] = translation
ControlUtil.connect_robot()
ControlUtil.franka_reset()
ControlUtil.set_gripper_pose(INIT_GRIPPER_POSE)
view_data = CommunicateUtil.get_view_data(init=True)
depth_to_color = np.array(view_data["depth_to_color"])
depth_to_ee = color_to_ee @ depth_to_color
print(depth_to_ee)
with open("depth_to_ee.json", "w") as f:
json.dump(depth_to_ee.tolist(), f, indent=4)
if __name__ == "__main__":
res = example_calibration()
get_depth_to_end_effector_pose("hand_eye.json")
# example_view_generator()

182
runners/inference.py Normal file
View File

@@ -0,0 +1,182 @@
import os
import numpy as np
import requests
from PytorchBoot.runners import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from utils.pose_util import PoseUtil
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.view_util import ViewUtil
from scipy.spatial.transform import Rotation as R
@stereotype.runner("inference_runner")
class InferenceRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("inference")
self.inference_config = ConfigManager.get("runner", "inference")
self.server_url = self.inference_config["server_url"]
self.max_iter = self.inference_config["max_iter"]
self.max_fail = self.inference_config["max_fail"]
self.max_no_new_pts = self.inference_config["max_no_new_pts"]
self.min_delta_pts_num = self.inference_config["min_delta_pts_num"]
def check_stop(self, cnt, fail, no_new_pts):
if cnt > self.max_iter:
return True
if fail > self.max_fail:
return True
if no_new_pts > self.max_no_new_pts:
return True
return False
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0.005):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def run(self):
ControlUtil.connect_robot()
ControlUtil.init()
scanned_pts_list = []
scanned_n_to_world_pose = []
cnt = 0
fail = 0
no_new_pts = 0
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_to_real_world = ControlUtil.get_pose()
if view_data is None:
Log.error("No view data received")
fail += 1
return
cam_shot_pts = ViewUtil.get_pts(view_data)
# ########################################### DEBUG ###########################################
# sensor_pts = PtsUtil.transform_point_cloud(cam_shot_pts, np.linalg.inv(ControlUtil.CAMERA_TO_LEFT_CAMERA))
# np.savetxt('/home/yan20/Downloads/left_pts_0.txt', cam_shot_pts)
# np.savetxt('/home/yan20/Downloads/sensor_pts_0.txt', sensor_pts)
# #############################################################################################
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
#import ipdb; ipdb.set_trace()
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
curr_pts = world_splitted_shot_pts
curr_pose = first_cam_to_real_world
curr_pose_6d = PoseUtil.matrix_to_rotation_6d_numpy(curr_pose[:3,:3])
curr_pose_9d = np.concatenate([curr_pose_6d, curr_pose[:3, 3]], axis=0)
scanned_pts_list.append(curr_pts.tolist())
scanned_n_to_world_pose.append(curr_pose_9d.tolist())
combined_pts = np.concatenate(scanned_pts_list, axis=0)
downsampled_combined_pts = PtsUtil.voxel_downsample_point_cloud(combined_pts, 0.003)
last_downsampled_combined_pts_num = downsampled_combined_pts.shape[0]
Log.info(f"First downsampled combined pts: {last_downsampled_combined_pts_num}")
####################################### DEBUG #######################################
# scan_count = 0
# save_path = "/home/yan20/Downloads/pts"
# if not os.path.exists(save_path):
# os.makedirs(save_path)
#####################################################################################
while not self.check_stop(cnt, fail, no_new_pts):
data = {
"scanned_pts": scanned_pts_list,
"scanned_n_to_world_pose_9d": scanned_n_to_world_pose
}
# pts = np.array(data['scanned_pts'][-1])
# np.savetxt(f'{save_path}/pts_{scan_count}.txt', pts)
# scan_count += 1
response = requests.post(self.server_url, json=data)
result = response.json()
pred_pose_9d = np.array(result["pred_pose_9d"])
pred_rot_6d = pred_pose_9d[0, :6]
pred_trans = pred_pose_9d[0, 6:]
pred_rot_mat = PoseUtil.rotation_6d_to_matrix_numpy(pred_rot_6d)
pred_pose = np.eye(4)
pred_pose[:3, :3] = pred_rot_mat
pred_pose[:3, 3] = pred_trans
target_camera_pose = pred_pose @ ControlUtil.CAMERA_CORRECTION
ControlUtil.move_to(target_camera_pose)
cnt += 1
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("No view data received")
fail += 1
continue
cam_shot_pts = ViewUtil.get_pts(view_data)
left_cam_to_first_left_cam = ViewUtil.get_camera_pose(view_data)
curr_pose = first_cam_to_real_world @ left_cam_to_first_left_cam @ np.linalg.inv(ControlUtil.CAMERA_CORRECTION)
# curr_pose = pred_pose
# curr_pose = first_cam_to_real_world @ ViewUtil.get_camera_pose(view_data)
print('pred_pose:', pred_pose)
print('curr_pose:', curr_pose)
##################################### DEBUG #####################################
# print(curr_pose)
# rot = R.from_matrix(curr_pose[:3, :3])
# quat_xyzw = rot.as_quat()
# translation = curr_pose[:3, 3]
# print(quat_xyzw, translation)
# # from ipdb import set_trace; set_trace()
#################################################################################
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
curr_pts = world_splitted_shot_pts
import ipdb; ipdb.set_trace()
from utils.vis import visualizeUtil
visualizeUtil.visualize_pts_and_camera(world_splitted_shot_pts,pred_pose)
curr_pose_6d = PoseUtil.matrix_to_rotation_6d_numpy(curr_pose[:3,:3])
curr_pose_9d = np.concatenate([curr_pose_6d, curr_pose[:3, 3]], axis=0)
scanned_pts_list.append(curr_pts.tolist())
scanned_n_to_world_pose.append(curr_pose_9d.tolist())
combined_pts = np.concatenate(scanned_pts_list, axis=0)
downsampled_combined_pts = PtsUtil.voxel_downsample_point_cloud(combined_pts, 0.003)
curr_downsampled_combined_pts_num = downsampled_combined_pts.shape[0]
Log.info(f"Downsampled combined pts: {curr_downsampled_combined_pts_num}")
if curr_downsampled_combined_pts_num < last_downsampled_combined_pts_num + self.min_delta_pts_num:
no_new_pts += 1
Log.info(f"No new points, cnt: {cnt}, fail: {fail}, no_new_pts: {no_new_pts}")
continue
Log.success("Inference finished")
# self.save_inference_result(scanned_pts_list, downsampled_combined_pts)
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
self.inference_result_dir = os.path.join(self.experiment_path, "inference_result")
os.makedirs(self.inference_result_dir)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
self.inference_result_dir = os.path.join(self.experiment_path, "inference_result")
def save_inference_result(self, scanned_pts_list, downsampled_combined_pts):
import time
dir_name = time.strftime("inference_result_%Y_%m_%d_%Hh%Mm%Ss", time.localtime())
dir_path = os.path.join(self.inference_result_dir, dir_name)
for i in range(len(scanned_pts_list)):
np.savetxt(os.path.join(dir_path, f"{i}.txt"), np.array(scanned_pts_list[i]))
np.savetxt(os.path.join(dir_path, "downsampled_combined_pts.txt"), np.array(downsampled_combined_pts))
Log.success("Inference result saved")