From ba36803fba08e293626044c07d51d18e0f69b93d Mon Sep 17 00:00:00 2001 From: hofee Date: Wed, 9 Oct 2024 21:46:13 +0800 Subject: [PATCH] finish pipeline --- .gitignore | 4 +- configs/cad_config.yaml | 8 +- configs/render_config.yaml | 23 ------ runners/cad_strategy.py | 125 ++++++++++++++++++------------ utils/communicate_util.py | 22 ++++-- utils/control_util.py | 151 +++++++++++++++++++++++-------------- utils/preprocess_util.py | 3 +- utils/view_util.py | 127 +++++++++++++++++++++++++++++++ 8 files changed, 325 insertions(+), 138 deletions(-) delete mode 100644 configs/render_config.yaml create mode 100644 utils/view_util.py diff --git a/.gitignore b/.gitignore index 6dfd75d..c094c33 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,9 @@ __pycache__/ test_output/ # C extensions *.so - +*.txt +experiments/ +temp_output/ # Distribution / packaging .Python build/ diff --git a/configs/cad_config.yaml b/configs/cad_config.yaml index e962b93..8a2ab32 100644 --- a/configs/cad_config.yaml +++ b/configs/cad_config.yaml @@ -10,8 +10,10 @@ runner: root_dir: "experiments" generate: - model_dir: "/home/user/nbv_rec/data/models" - table_model_path: "/home/user/nbv_rec/data/table.obj" + blender_bin_path: /home/yan20/Desktop/nbv_rec/project/blender_app/blender-4.2.2-linux-x64/blender + generator_script_path: /home/yan20/Desktop/nbv_rec/project/blender_app/data_generator.py + model_dir: "/home/yan20/Desktop/nbv_rec/data/models" + table_model_path: "/home/yan20/Desktop/nbv_rec/data/table.obj" model_start_idx: 0 voxel_size: 0.005 max_view: 512 @@ -26,7 +28,7 @@ runner: near_plane: 0.01 far_plane: 5 fov_vertical: 25 - resolution: [1280,800] + resolution: [640,400] eye_distance: 0.15 eye_angle: 25 Light: diff --git a/configs/render_config.yaml b/configs/render_config.yaml deleted file mode 100644 index bac4f56..0000000 --- a/configs/render_config.yaml +++ /dev/null @@ -1,23 +0,0 @@ - -runner: - general: - seed: 1 - device: cpu - cuda_visible_devices: "0,1,2,3,4,5,6,7" - - experiment: - name: debug - root_dir: "experiments" - - web: - host: “0.0.0.0” - port: 11111 - render: - model_dir: "/home/yan20/nbv_rec/data/test_CAD/test_model" - - - reconstruct: - soft_overlap_threshold: 0.3 - hard_overlap_threshold: 0.6 - scan_points_threshold: 10 - \ No newline at end of file diff --git a/runners/cad_strategy.py b/runners/cad_strategy.py index 2a5c402..a34c5d9 100644 --- a/runners/cad_strategy.py +++ b/runners/cad_strategy.py @@ -1,4 +1,5 @@ import os +import time import trimesh import tempfile import subprocess @@ -15,6 +16,8 @@ from utils.pts_util import PtsUtil from utils.reconstruction_util import ReconstructionUtil from utils.preprocess_util import save_scene_data from utils.data_load import DataLoadUtil +from utils.view_util import ViewUtil + @stereotype.runner("CAD_strategy_runner") class CADStrategyRunner(Runner): @@ -29,6 +32,8 @@ class CADStrategyRunner(Runner): } self.generate_config = ConfigManager.get("runner", "generate") self.reconstruct_config = ConfigManager.get("runner", "reconstruct") + self.blender_bin_path = self.generate_config["blender_bin_path"] + self.generator_script_path = self.generate_config["generator_script_path"] self.model_dir = self.generate_config["model_dir"] self.voxel_size = self.generate_config["voxel_size"] self.max_view = self.generate_config["max_view"] @@ -47,13 +52,6 @@ class CADStrategyRunner(Runner): def load_experiment(self, backup_name=None): super().load_experiment(backup_name) - - def get_pts_from_view_data(self, view_data): - depth = view_data["depth_image"] - depth_intrinsics = view_data["depth_intrinsics"] - depth_extrinsics = view_data["depth_extrinsics"] - cam_pts = PtsUtil.get_pts_from_depth(depth, depth_intrinsics, depth_extrinsics) - return cam_pts def split_scan_pts_and_obj_pts(self, world_pts, scan_pts_z, z_threshold = 0.003): scan_pts = world_pts[scan_pts_z < z_threshold] @@ -61,56 +59,65 @@ class CADStrategyRunner(Runner): return scan_pts, obj_pts def run_one_model(self, model_name): - + shot_pts_list = [] + ControlUtil.connect_robot() ''' init robot ''' - #ControlUtil.init() + Log.info("[Part 1/5] start init and register") + ControlUtil.init() ''' load CAD model ''' - model_path = os.path.join(self.model_dir, model_name,"mesh.obj") + model_path = os.path.join(self.model_dir, model_name,"mesh.ply") + temp_name = "cad_model_world" cad_model = trimesh.load(model_path) ''' take first view ''' - #view_data = CommunicateUtil.get_view_data(init=True) - #first_cam_pts = self.get_pts_from_view_data(view_data) + Log.info("[Part 1/5] take first view data") + view_data = CommunicateUtil.get_view_data(init=True) + first_cam_pts = ViewUtil.get_pts(view_data) + #shot_pts_list.append(first_cam_pts) ''' register ''' - #cad_to_cam = PtsUtil.register(first_cam_pts, cad_model) - #cam_to_world = ControlUtil.get_pose() - cad_to_world = np.eye(4) #cam_to_world @ cad_to_cam - + Log.info("[Part 1/5] do registeration") + cam_to_cad = PtsUtil.register(first_cam_pts, cad_model) + first_cam_to_world = ControlUtil.get_pose() + cad_to_world = first_cam_to_world @ np.linalg.inv(cam_to_cad) + Log.success("[Part 1/5] finish init and register") world_to_blender_world = np.eye(4) world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215]) - cad_to_blender_world = np.dot(world_to_blender_world, cad_to_world) + cad_to_blender_world = world_to_blender_world @ cad_to_world + temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output" + cad_model:trimesh.Trimesh = cad_model.apply_transform(cad_to_blender_world) + cad_model.export(os.path.join(temp_dir, f"{temp_name}.obj")) with tempfile.TemporaryDirectory() as temp_dir: - name = "cad_model_world" + temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output" - cad_model.export(os.path.join(temp_dir, f"{name}.obj")) - temp_dir = "/home/user/nbv_rec/nbv_rec_control/test_output" - scene_dir = os.path.join(temp_dir, name) - script_path = "/home/user/nbv_rec/blender_app/data_generator.py" + cad_model.export(os.path.join(temp_dir, f"{temp_name}.obj")) + scene_dir = os.path.join(temp_dir, temp_name) ''' sample view ''' - # import ipdb; ipdb.set_trace() - # print("start running renderer") - # result = subprocess.run([ - # 'blender', '-b', '-P', script_path, '--', temp_dir - # ], capture_output=True, text=True) - # print("finish running renderer") - # + Log.info("[Part 2/5] start running renderer") + result = subprocess.run([ + self.blender_bin_path, '-b', '-P', self.generator_script_path, '--', temp_dir + ], capture_output=True, text=True) + Log.success("[Part 2/5] finish running renderer") + world_model_points = np.loadtxt(os.path.join(scene_dir, "points_and_normals.txt"))[:,:3] ''' preprocess ''' - # save_scene_data(temp_dir, name) + Log.info("[Part 3/5] start preprocessing data") + save_scene_data(temp_dir, temp_name) + Log.success("[Part 3/5] finish preprocessing data") - pts_dir = os.path.join(temp_dir,name,"pts") + pts_dir = os.path.join(temp_dir,temp_name,"pts") sample_view_pts_list = [] scan_points_idx_list = [] frame_num = len(os.listdir(pts_dir)) for frame_idx in range(frame_num): - pts_path = os.path.join(temp_dir,name, "pts", f"{frame_idx}.txt") - idx_path = os.path.join(temp_dir,name, "scan_points_indices", f"{frame_idx}.txt") + pts_path = os.path.join(temp_dir,temp_name, "pts", f"{frame_idx}.txt") + idx_path = os.path.join(temp_dir,temp_name, "scan_points_indices", f"{frame_idx}.txt") point_cloud = np.loadtxt(pts_path) - sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, self.voxel_size) + if point_cloud.shape[0] != 0: + sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, self.voxel_size) indices = np.loadtxt(idx_path, dtype=np.int32) try: len(indices) @@ -121,6 +128,7 @@ class CADStrategyRunner(Runner): ''' generate strategy ''' + Log.info("[Part 4/5] start generating strategy") limited_useful_view, _, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap( world_model_points, sample_view_pts_list, scan_points_indices_list = scan_points_idx_list, @@ -131,28 +139,49 @@ class CADStrategyRunner(Runner): scan_points_threshold = self.scan_points_threshold, status_info=self.status_info ) - + Log.success("[Part 4/5] finish generating strategy") + ''' extract cam_to_world sequence ''' cam_to_world_seq = [] coveraget_rate_seq = [] - - from ipdb import set_trace; set_trace() + render_pts = [] + idx_seq = [] for idx, coverage_rate in limited_useful_view: - path = DataLoadUtil.get_path(temp_dir, name, idx) + path = DataLoadUtil.get_path(temp_dir, temp_name, idx) cam_info = DataLoadUtil.load_cam_info(path, binocular=True) cam_to_world_seq.append(cam_info["cam_to_world"]) coveraget_rate_seq.append(coverage_rate) + idx_seq.append(idx) + render_pts.append(sample_view_pts_list[idx]) - # ''' take best seq view ''' - # for cam_to_world in cam_to_world_seq: - # ControlUtil.move_to(cam_to_world) - # ''' get world pts ''' - # view_data = CommunicateUtil.get_view_data() - # cam_pts = self.get_pts_from_view_data(view_data) - # scan_points_idx = None - # world_pts = PtsUtil.transform_point_cloud(cam_pts, cam_to_world) - # sample_view_pts_list.append(world_pts) - # scan_points_idx_list.append(scan_points_idx) + Log.info("[Part 5/5] start running robot") + ''' take best seq view ''' + + for cam_to_world in cam_to_world_seq: + ControlUtil.move_to(cam_to_world) + ''' get world pts ''' + time.sleep(1) + view_data = CommunicateUtil.get_view_data() + if view_data is None: + Log.error("Failed to get view data") + continue + cam_pts = ViewUtil.get_pts(view_data) + shot_pts_list.append(cam_pts) + + #import ipdb;ipdb.set_trace() + print(idx_seq) + + for idx in range(len(shot_pts_list)): + if not os.path.exists(os.path.join(temp_dir, temp_name, "shot_pts")): + os.makedirs(os.path.join(temp_dir, temp_name, "shot_pts")) + if not os.path.exists(os.path.join(temp_dir, temp_name, "render_pts")): + os.makedirs(os.path.join(temp_dir, temp_name, "render_pts")) + shot_pts = PtsUtil.transform_point_cloud(shot_pts_list[idx], first_cam_to_world) + np.savetxt(os.path.join(temp_dir, temp_name, "shot_pts", f"{idx}.txt"), shot_pts) + np.savetxt(os.path.join(temp_dir, temp_name, "render_pts", f"{idx}.txt"), render_pts[idx]) + + + Log.success("[Part 5/5] finish running robot") def run(self): diff --git a/utils/communicate_util.py b/utils/communicate_util.py index 6abe5dc..3f34954 100644 --- a/utils/communicate_util.py +++ b/utils/communicate_util.py @@ -1,23 +1,35 @@ import requests import numpy as np -import cv2 - class CommunicateUtil: - VIEW_HOST = "127.0.0.1:5000" - INFERENCE_HOST = "127.0.0.1:5000" + VIEW_HOST = "192.168.1.2:7999" #"10.7.250.52:7999" ## + INFERENCE_HOST = "127.0.0.1" + INFERENCE_PORT = 5000 def get_view_data(init = False) -> dict: params = {} params["create_scanner"] = init response = requests.get(f"http://{CommunicateUtil.VIEW_HOST}/api/data", json=params) data = response.json() + if not data["success"]: print(f"Failed to get view data") return None - return data + + image_id = data["image_id"] + depth_image = np.array(data["depth_image"], dtype=np.uint16) + depth_intrinsics = data["depth_intrinsics"] + depth_extrinsics = np.array(data["depth_extrinsics"]) + view_data = { + "image_id": image_id, + "depth_image": depth_image, + "depth_intrinsics": depth_intrinsics, + "depth_extrinsics": depth_extrinsics + } + return view_data def get_inference_data(view_data:dict) -> dict: data = {} return data + \ No newline at end of file diff --git a/utils/control_util.py b/utils/control_util.py index 3af6b20..7345eb2 100644 --- a/utils/control_util.py +++ b/utils/control_util.py @@ -1,41 +1,49 @@ import numpy as np from frankapy import FrankaArm from autolab_core import RigidTransform +import serial +import time class ControlUtil: - #__fa = FrankaArm(robot_num=2) + __fa:FrankaArm = None + __ser: serial.Serial = None + cnt_rotation = 0 BASE_TO_WORLD:np.ndarray = np.asarray([ - [1, 0, 0, -0.5], - [0, 1, 0, 0], - [0, 0, 1, -0.2], + [1, 0, 0, -0.7323], + [0, 1, 0, 0.05926], + [0, 0, 1, -0.21], [0, 0, 0, 1] ]) CAMERA_TO_GRIPPER:np.ndarray = np.asarray([ - [1, 0, 0, 0], - [0, 1, 0, 0], - [0, 0, 1, 0], + [0, -1, 0, 0.01], + [1, 0, 0, -0.05], + [0, 0, 1, 0.075], [0, 0, 0, 1] ]) - theta = np.radians(25) - INIT_POSE:np.ndarray = np.asarray([ - [np.cos(theta), 0, -np.sin(theta), 0], - [0, -1, 0, 0], - [-np.sin(theta), 0, -np.cos(theta), 0.35], - [0, 0, 0, 1] + INIT_GRIPPER_POSE:np.ndarray = np.asarray([ + [ 0.44808722 , 0.61103352 , 0.65256787 , 0.36428118], + [ 0.51676868 , -0.77267257 , 0.36866054, -0.26519364], + [ 0.72948524 , 0.17203456 ,-0.66200043 , 0.60938969], + [ 0. , 0. , 0. , 1. ] ]) + - AXIS_THRESHOLD = (-(np.pi+5e-2)/2, (np.pi+5e-2)/2) - + @staticmethod + def connect_robot(): + ControlUtil.__ser = serial.Serial(port="/dev/ttyUSB0", baudrate=115200) + ControlUtil.__fa = FrankaArm(robot_num=2) + @staticmethod def franka_reset() -> None: ControlUtil.__fa.reset_joints() @staticmethod def init() -> None: - ControlUtil.set_pose(ControlUtil.INIT_POSE) + ControlUtil.franka_reset() + ControlUtil.set_gripper_pose(ControlUtil.INIT_GRIPPER_POSE) @staticmethod def get_pose() -> np.ndarray: @@ -46,17 +54,35 @@ class ControlUtil: @staticmethod def set_pose(cam_to_world: np.ndarray) -> None: gripper_to_base = ControlUtil.solve_gripper_to_base(cam_to_world) - gripper_to_base = RigidTransform(rotation=gripper_to_base[:3, :3], translation=gripper_to_base[:3, 3], from_frame="franka_tool", to_frame="world") - ControlUtil.__fa.goto_pose(gripper_to_base, use_impedance=False, ignore_errors=False) + ControlUtil.set_gripper_pose(gripper_to_base) @staticmethod def rotate_display_table(degree): - pass + turn_directions = { + "left": 0, + "right": 1 + } + ControlUtil.cnt_rotation += degree + + print("total rot:", ControlUtil.cnt_rotation) + if degree >= 0: + turn_angle = degree + turn_direction = turn_directions["right"] + else: + turn_angle = -degree + turn_direction = turn_directions["left"] + write_len = ControlUtil.__ser.write(f"CT+TRUNSINGLE({turn_direction},{turn_angle});".encode('utf-8')) + @staticmethod def get_curr_gripper_to_base_pose() -> np.ndarray: return ControlUtil.__fa.get_pose().matrix + @staticmethod + def set_gripper_pose(gripper_to_base: np.ndarray) -> None: + gripper_to_base = RigidTransform(rotation=gripper_to_base[:3, :3], translation=gripper_to_base[:3, 3], from_frame="franka_tool", to_frame="world") + ControlUtil.__fa.goto_pose(gripper_to_base, duration=5, use_impedance=False, ignore_errors=False) + @staticmethod def solve_gripper_to_base(cam_to_world: np.ndarray) -> np.ndarray: return np.linalg.inv(ControlUtil.BASE_TO_WORLD) @ cam_to_world @ np.linalg.inv(ControlUtil.CAMERA_TO_GRIPPER) @@ -66,24 +92,38 @@ class ControlUtil: return ControlUtil.BASE_TO_WORLD @ gripper_to_base @ ControlUtil.CAMERA_TO_GRIPPER @staticmethod - def solve_display_table_rot_and_cam_to_world(cam_to_world: np.ndarray) -> tuple: - gripper_to_base = ControlUtil.solve_gripper_to_base(cam_to_world) - gripper_to_base_axis_angle = ControlUtil.get_gripper_to_base_axis_angle(gripper_to_base) + def check_limit(new_cam_to_world): + if new_cam_to_world[0,3] > 0 or new_cam_to_world[1,3] > 0: + return False + x = abs(new_cam_to_world[0,3]) + y = abs(new_cam_to_world[1,3]) - if ControlUtil.AXIS_THRESHOLD[0] <= gripper_to_base_axis_angle <= ControlUtil.AXIS_THRESHOLD[1]: + tan_y_x = y/x + if tan_y_x < np.sqrt(3)/3 or tan_y_x > np.sqrt(3): + return False + + return True + + @staticmethod + def solve_display_table_rot_and_cam_to_world(cam_to_world: np.ndarray) -> tuple: + if ControlUtil.check_limit(cam_to_world): return 0, cam_to_world else: - for display_table_rot in np.linspace(0.1,180, 1800): + min_display_table_rot = 180 + min_new_cam_to_world = None + for display_table_rot in np.linspace(0.1,360, 1800): display_table_rot_z_pose = ControlUtil.get_z_axis_rot_mat(display_table_rot) new_cam_to_world = display_table_rot_z_pose @ cam_to_world - if ControlUtil.AXIS_THRESHOLD[0] <= ControlUtil.get_gripper_to_base_axis_angle(new_cam_to_world) <= ControlUtil.AXIS_THRESHOLD[1]: - return -display_table_rot, new_cam_to_world - - display_table_rot = -display_table_rot - display_table_rot_z_pose = ControlUtil.get_z_axis_rot_mat(display_table_rot) - new_cam_to_world = display_table_rot_z_pose @ cam_to_world - if ControlUtil.AXIS_THRESHOLD[0] <= ControlUtil.get_gripper_to_base_axis_angle(new_cam_to_world) <= ControlUtil.AXIS_THRESHOLD[1]: - return -display_table_rot, new_cam_to_world + if ControlUtil.check_limit(new_cam_to_world): + if display_table_rot < min_display_table_rot: + min_display_table_rot, min_new_cam_to_world = display_table_rot, new_cam_to_world + if abs(display_table_rot - 360) < min_display_table_rot: + min_display_table_rot, min_new_cam_to_world = display_table_rot - 360, new_cam_to_world + + if min_new_cam_to_world is None: + raise ValueError("No valid display table rotation found") + + return min_display_table_rot, min_new_cam_to_world @staticmethod def get_z_axis_rot_mat(degree): @@ -107,35 +147,32 @@ class ControlUtil: def move_to(pose: np.ndarray): rot_degree, cam_to_world = ControlUtil.solve_display_table_rot_and_cam_to_world(pose) print("table rot degree:", rot_degree) + exec_time = rot_degree/9 + start_time = time.time() ControlUtil.rotate_display_table(rot_degree) ControlUtil.set_pose(cam_to_world) - - + end_time = time.time() + if end_time - start_time < exec_time: + time.sleep(exec_time - (end_time - start_time)) # ----------- Debug Test ------------- if __name__ == "__main__": + ControlUtil.connect_robot() + print(ControlUtil.get_curr_gripper_to_base_pose()) ControlUtil.init() - import time - start = time.time() - rot_degree, cam_to_world = ControlUtil.solve_display_table_rot_and_cam_to_world(ControlUtil.INIT_POSE) - end = time.time() - print(f"Time: {end-start}") - print(rot_degree, cam_to_world) - # test_pose = np.asarray([ - # [1, 0, 0, 0.4], - # [0, -1, 0, 0], - # [0, 0, -1, 0.6], - # [0, 0, 0, 1] - # ]) - # ControlUtil.set_pose(test_pose) - # print(ControlUtil.get_pose()) - # ControlUtil.reset() - # print(ControlUtil.get_pose()) +# import time +# start = time.time() +# test_pose = np.asarray([[ 0.06023737 ,-0.81328565, 0.57872715, -0.23602393], +# [-0.99107872 ,-0.11773834, -0.06230158, 0.24174289], +# [ 0.11880735 ,-0.56981127 ,-0.813138 , 0.15199069], +# [ 0. , 0. , 0. , 1. , ]]) +# print(test_pose) +# rot_degree, cam_to_world = ControlUtil.solve_display_table_rot_and_cam_to_world(test_pose) +# print(rot_degree, cam_to_world) +# end = time.time() +# print(f"Time: {end-start}") - angle = ControlUtil.get_gripper_to_base_axis_angle(ControlUtil.solve_gripper_to_base(cam_to_world)) - threshold = ControlUtil.AXIS_THRESHOLD - - angle_degree = np.degrees(angle) - threshold_degree = np.degrees(threshold[0]), np.degrees(threshold[1]) - print(f"Angle: {angle_degree}, range: {threshold_degree}") - ControlUtil.set_pose(cam_to_world) \ No newline at end of file +# ControlUtil.set_pose(test_pose) +# import ipdb; ipdb.set_trace() +# ControlUtil.move_to(test_pose) +# print("End!") diff --git a/utils/preprocess_util.py b/utils/preprocess_util.py index 83b1163..737b48a 100644 --- a/utils/preprocess_util.py +++ b/utils/preprocess_util.py @@ -8,6 +8,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.reconstruction_util import ReconstructionUtil from utils.data_load import DataLoadUtil from utils.pts_util import PtsUtil +from PytorchBoot.utils.log_util import Log def save_np_pts(path, pts: np.ndarray, file_type="txt"): if file_type == "txt": @@ -75,7 +76,7 @@ def save_scene_data(root, scene, scene_idx=0, scene_total=1,file_type="txt"): ''' read frame data(depth|mask|normal) ''' frame_num = DataLoadUtil.get_scene_seq_length(root, scene) for frame_id in range(frame_num): - print(f"[scene({scene_idx}/{scene_total})|frame({frame_id}/{frame_num})]Processing {scene} frame {frame_id}") + Log.info(f"[frame({frame_id}/{frame_num})]Processing {scene} frame {frame_id}") path = DataLoadUtil.get_path(root, scene, frame_id) cam_info = DataLoadUtil.load_cam_info(path, binocular=True) depth_L, depth_R = DataLoadUtil.load_depth( diff --git a/utils/view_util.py b/utils/view_util.py new file mode 100644 index 0000000..b9ba6f2 --- /dev/null +++ b/utils/view_util.py @@ -0,0 +1,127 @@ +import numpy as np +from scipy.spatial.transform import Rotation as R +from dataclasses import dataclass + + +@dataclass +class CameraIntrinsics: + width: int + height: int + fx: float + fy: float + cx: float + cy: float + + @property + def intrinsic_matrix(self): + return np.array([[self.fx, 0, self.cx], [0, self.fy, self.cy], [0, 0, 1]]) + + +@dataclass +class CameraExtrinsics: + def __init__(self, rotation: np.ndarray, translation: np.ndarray, rot_type: str): + """ + rotation: 3x3 rotation matrix or 1x3 euler angles or 1x4 quaternion + translation: 1x3 or 3x1 translation vector + rot_type: "mat", "euler_xyz", "quat_xyzw" + """ + assert rot_type in ["mat", "euler_xyz", "quat_xyzw"] + if rot_type == "mat": + self._rot = R.from_matrix(rotation) + elif rot_type == "euler_xyz": + self._rot = R.from_euler('xyz', rotation, degrees=True) + elif rot_type == "quat_xyzw": + self._rot = R.from_quat(rotation) + self._translation = translation + + @property + def extrinsic_matrix(self): + return np.vstack([np.hstack([self._rot.as_matrix(), self._translation.reshape(3, 1)]), [0, 0, 0, 1]]) + + @property + def rotation_euler_xyz(self): + return self._rot.as_euler('xyz', degrees=True) + + @property + def rotation_quat_xyzw(self): + return self._rot.as_quat() + + @property + def rotation_matrix(self): + return self._rot.as_matrix() + + @property + def translation(self): + return self._translation + + +@dataclass +class CameraData: + def __init__(self, depth_image: np.ndarray, image_id: int, intrinsics: CameraIntrinsics, extrinsics: CameraExtrinsics): + self._depth_image = depth_image + self._image_id = image_id + self._intrinsics = intrinsics + self._extrinsics = extrinsics + + @property + def depth_image(self): + return self._depth_image + + @property + def image_id(self): + return self._image_id + + @property + def intrinsics(self): + return self._intrinsics.intrinsic_matrix + + @property + def extrinsics(self): + return self._extrinsics.extrinsic_matrix + + @property + def projection_matrix(self): + return self.intrinsics @ self.extrinsics[:3, :4] + + @property + def pts_camera(self): + height, width = self.depth_image.shape + v, u = np.indices((height, width)) + points = np.vstack([u.flatten(), v.flatten(), np.ones_like(u.flatten())]) # 3xN + points = np.linalg.inv(self.intrinsics) @ points # 3xN + points = points.T # Nx3 + points = points * self.depth_image.flatten()[:, np.newaxis] # Nx3 + return points + + @property + def pts_world(self): + homogeneous_pts = np.hstack([self.pts_camera, np.ones((self.pts_camera.shape[0], 1))]) # Nx4 + pts_world = self.extrinsics @ homogeneous_pts.T # 4xN + return pts_world[:3, :].T + +class ViewUtil: + def get_pts(view_data): + image_id = view_data["image_id"] + depth_intrinsics = view_data["depth_intrinsics"] + depth_extrinsics = view_data["depth_extrinsics"] + depth_image = np.array(view_data["depth_image"], dtype=np.uint16) + if image_id is None: + return None + else: + print(f"Image ID: {image_id}") + camera_intrinsics = CameraIntrinsics( + depth_intrinsics['width'], + depth_intrinsics['height'], + depth_intrinsics['fx'], + depth_intrinsics['fy'], + depth_intrinsics['cx'], + depth_intrinsics['cy'] + ) + camera_extrinsics = CameraExtrinsics( + depth_extrinsics[:3, :3], + depth_extrinsics[:3, 3], + rot_type="mat" + ) + camera_data = CameraData(depth_image, image_id, camera_intrinsics, camera_extrinsics) + pts = camera_data.pts_world + return pts/1000 \ No newline at end of file