Compare commits

...

20 Commits

Author SHA1 Message Date
hofee
307994c20d update 2024-10-18 17:13:45 +08:00
hofee
20514be419 update displaytable rotation 2024-10-14 19:37:34 +08:00
2f87a2626c update 2024-10-13 19:47:05 +08:00
41ee79db0c update config 2024-10-13 15:24:41 +08:00
hofee
07dcdb3452 add close loop control 2024-10-12 23:11:25 +08:00
8d43d4de60 update 2024-10-12 20:25:55 +08:00
3fe74eb6eb update 2024-10-12 16:39:00 +08:00
cd85fed3a0 update 2024-10-10 21:48:55 +08:00
hofee
8fd2d6b1e1 optimize 2024-10-10 15:13:40 +08:00
hofee
f6c4db859e add multiprocess 2024-10-10 14:42:57 +08:00
ba36803fba finish pipeline 2024-10-09 21:46:13 +08:00
hofee
f8514564c1 solve merge 2024-10-08 21:33:17 +08:00
hofee
3ab046b134 update 2024-10-08 21:28:30 +08:00
3f1ae95ff8 add renderer 2024-10-08 16:16:54 +08:00
hofee
d9d2716ba7 debug view sample 2024-10-07 22:03:50 -05:00
hofee
825f8652d5 finish register 2024-10-08 00:24:22 +08:00
hofee
2209acce1b update 2024-10-07 21:48:24 +08:00
hofee
dc769c5c1f remove pycache 2024-10-07 16:44:03 +08:00
e9dfde6b9c udpate 2024-10-07 16:20:56 +08:00
49fdb7f948 finish get_pose and set_pose 2024-10-06 21:52:05 +08:00
19 changed files with 2524 additions and 16 deletions

165
.gitignore vendored Normal file
View File

@@ -0,0 +1,165 @@
# ---> Python
# Byte-compiled / optimized / DLL files
test/
__pycache__/
*.py[cod]
*$py.class
test_output/
# C extensions
*.so
*.txt
experiments/
temp_output/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

16
app_cad.py Normal file
View File

@@ -0,0 +1,16 @@
from PytorchBoot.application import PytorchBootApplication
from runners.cad_open_loop_strategy import CADOpenLoopStrategyRunner
from runners.cad_close_loop_strategy import CADCloseLoopStrategyRunner
@PytorchBootApplication("cad_ol")
class AppCADOpenLoopStrategy:
@staticmethod
def start():
CADOpenLoopStrategyRunner("configs/cad_open_loop_config.yaml").run()
@PytorchBootApplication("cad_cl")
class AppCADCloseLoopStrategy:
@staticmethod
def start():
CADCloseLoopStrategyRunner("configs/cad_close_loop_config.yaml").run()

15
combine_all_pts.py Normal file
View File

@@ -0,0 +1,15 @@
import numpy as np
import os
if __name__ == "__main__":
pts_dir_path = "/home/yan20/nbv_rec/project/franka_control/temp_output/cad_model_world/pts"
pts_dir = os.listdir(pts_dir_path)
pts_list = []
for i in range(len(pts_dir)):
pts_path = os.path.join(pts_dir_path, pts_dir[i])
pts = np.loadtxt(pts_path)
pts_list.append(pts)
combined_pts = np.vstack(pts_list)
path = "/home/yan20/nbv_rec/project/franka_control"
np.savetxt(os.path.join(path, "combined_pts.txt"), combined_pts)

View File

@@ -0,0 +1,46 @@
runner:
general:
seed: 1
device: cpu
cuda_visible_devices: "0,1,2,3,4,5,6,7"
experiment:
name: debug
root_dir: "experiments"
generate:
blender_bin_path: /home/yan20/Desktop/nbv_rec/project/blender_app/blender-4.2.2-linux-x64/blender
generator_script_path: /home/yan20/Desktop/nbv_rec/project/blender_app/data_generator.py
model_dir: "/home/yan20/Desktop/nbv_rec/data/models"
table_model_path: "/home/yan20/Desktop/nbv_rec/data/table.obj"
model_start_idx: 0
voxel_size: 0.002
max_shot_view_num: 50
min_shot_new_pts_num: 10
min_coverage_increase: 0.001
max_view: 64
min_view: 32
max_diag: 0.7
min_diag: 0.01
random_view_ratio: 0
min_cam_table_included_degree: 20
obj_name: "bear"
light_and_camera_config:
Camera:
near_plane: 0.01
far_plane: 5
fov_vertical: 25
resolution: [640,400]
eye_distance: 0.15
eye_angle: 25
Light:
location: [0,0,3.5]
orientation: [0,0,0]
power: 150
reconstruct:
soft_overlap_threshold: 0.3
hard_overlap_threshold: 0.6
scan_points_threshold: 10

View File

@@ -0,0 +1,43 @@
runner:
general:
seed: 1
device: cpu
cuda_visible_devices: "0,1,2,3,4,5,6,7"
experiment:
name: debug
root_dir: "experiments"
generate:
blender_bin_path: /home/yan20/Desktop/nbv_rec/project/blender_app/blender-4.2.2-linux-x64/blender
generator_script_path: /home/yan20/Desktop/nbv_rec/project/blender_app/data_generator.py
model_dir: "/home/yan20/Desktop/nbv_rec/data/models"
table_model_path: "/home/yan20/Desktop/nbv_rec/data/table.obj"
model_start_idx: 0
voxel_size: 0.002
max_view: 512
min_view: 128
max_diag: 0.7
min_diag: 0.01
random_view_ratio: 0
min_cam_table_included_degree: 20
obj_name: "bear"
light_and_camera_config:
Camera:
near_plane: 0.01
far_plane: 5
fov_vertical: 25
resolution: [640,400]
eye_distance: 0.15
eye_angle: 25
Light:
location: [0,0,3.5]
orientation: [0,0,0]
power: 150
reconstruct:
soft_overlap_threshold: 0.3
hard_overlap_threshold: 0.6
scan_points_threshold: 10

49
load_normal.py Normal file
View File

@@ -0,0 +1,49 @@
import cv2
import os
import numpy as np
def load_normal(path, binocular=False, left_only=False):
if binocular and not left_only:
normal_path_L = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
normal_image_L = cv2.imread(normal_path_L, cv2.IMREAD_UNCHANGED)
normal_path_R = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_R.png"
)
normal_image_R = cv2.imread(normal_path_R, cv2.IMREAD_UNCHANGED)
normalized_normal_image_L = normal_image_L / 255.0 * 2.0 - 1.0
normalized_normal_image_R = normal_image_R / 255.0 * 2.0 - 1.0
return normalized_normal_image_L, normalized_normal_image_R
else:
if binocular and left_only:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
else:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + ".png"
)
normal_image = cv2.imread(normal_path, cv2.IMREAD_UNCHANGED)
normalized_normal_image = normal_image / 255.0 * 2.0 - 1.0
return normalized_normal_image
def show_rgb(event, x, y, flags, param):
if event == cv2.EVENT_MOUSEMOVE:
pixel_value = param[y, x]
print(f"RGB at ({x},{y}): {pixel_value}")
if __name__ == "__main__":
path = "/Users/hofee/temp/1"
normal_image = load_normal(path, binocular=True, left_only=True)
display_image = ((normal_image + 1.0) / 2.0 * 255).astype(np.uint8)
cv2.namedWindow("Normal Image")
cv2.setMouseCallback("Normal Image", show_rgb, param=display_image)
while True:
cv2.imshow("Normal Image", display_image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()

View File

@@ -0,0 +1,244 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data, save_scene_data_multithread
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
@stereotype.runner("CAD_close_loop_strategy_runner")
class CADCloseLoopStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_strategy")
self.status_info = {
"status_manager": status_manager,
"app_name": "cad",
"runner_name": "CAD_close_loop_strategy_runner",
}
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.model_dir = self.generate_config["model_dir"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_view = self.generate_config["max_view"]
self.min_view = self.generate_config["min_view"]
self.max_diag = self.generate_config["max_diag"]
self.min_diag = self.generate_config["min_diag"]
self.min_cam_table_included_degree = self.generate_config[
"min_cam_table_included_degree"
]
self.max_shot_view_num = self.generate_config["max_shot_view_num"]
self.min_shot_new_pts_num = self.generate_config["min_shot_new_pts_num"]
self.min_coverage_increase = self.generate_config["min_coverage_increase"]
self.random_view_ratio = self.generate_config["random_view_ratio"]
self.soft_overlap_threshold = self.reconstruct_config["soft_overlap_threshold"]
self.hard_overlap_threshold = self.reconstruct_config["hard_overlap_threshold"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold=0):
scan_pts = world_pts[world_pts[:, 2] < z_threshold]
obj_pts = world_pts[world_pts[:, 2] >= z_threshold]
return scan_pts, obj_pts
def run_one_model(self, model_name):
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
ControlUtil.connect_robot()
""" init robot """
Log.info("[Part 1/5] start init and register")
ControlUtil.init()
""" load CAD model """
model_path = os.path.join(self.model_dir, model_name, "mesh.ply")
temp_name = "cad_model_world"
cad_model = trimesh.load(model_path)
""" take first view """
Log.info("[Part 1/5] take first view data")
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_pts = ViewUtil.get_pts(view_data)
first_cam_to_real_world = ControlUtil.get_pose()
first_real_world_pts = PtsUtil.transform_point_cloud(
first_cam_pts, first_cam_to_real_world
)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(
first_real_world_pts
)
np.savetxt(f"first_real_pts_{model_name}.txt", first_splitted_real_world_pts)
""" register """
Log.info("[Part 1/4] do registeration")
real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("[Part 1/4] finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world: trimesh.Trimesh = cad_model.apply_transform(
cad_to_real_world
)
cad_model_real_world.export(
os.path.join(temp_dir, f"real_world_{temp_name}.obj")
)
cad_model_blender_world: trimesh.Trimesh = cad_model.apply_transform(
real_world_to_blender_world
)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
cad_model_blender_world.export(os.path.join(temp_dir, f"{temp_name}.obj"))
""" sample view """
Log.info("[Part 2/4] start running renderer")
subprocess.run(
[
self.blender_bin_path,
"-b",
"-P",
self.generator_script_path,
"--",
temp_dir,
],
capture_output=True,
text=True,
)
Log.success("[Part 2/4] finish running renderer")
""" preprocess """
Log.info("[Part 3/4] start preprocessing data")
save_scene_data(temp_dir, temp_name)
Log.success("[Part 3/4] finish preprocessing data")
pts_dir = os.path.join(temp_dir, temp_name, "pts")
sample_view_pts_list = []
scan_points_idx_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(temp_dir, temp_name, "pts", f"{frame_idx}.txt")
idx_path = os.path.join(
temp_dir, temp_name, "scan_points_indices", f"{frame_idx}.npy"
)
point_cloud = np.loadtxt(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(
point_cloud, self.voxel_size
)
indices = np.load(idx_path)
try:
len(indices)
except:
indices = np.array([indices])
sample_view_pts_list.append(sampled_point_cloud)
scan_points_idx_list.append(indices)
""" close-loop strategy """
scanned_pts = PtsUtil.voxel_downsample_point_cloud(
first_splitted_real_world_pts, self.voxel_size
)
shot_pts_list = [first_splitted_real_world_pts]
history_indices = []
last_coverage = 0
Log.info("[Part 4/4] start close-loop control")
cnt = 0
while True:
#import ipdb; ipdb.set_trace()
next_best_view, next_best_coverage, next_best_covered_num = (
ReconstructionUtil.compute_next_best_view_with_overlap(
scanned_pts,
sample_view_pts_list,
history_indices,
scan_points_idx_list,
threshold=self.voxel_size,
overlap_area_threshold=25,
scan_points_threshold=self.scan_points_threshold,
)
)
nbv_path = DataLoadUtil.get_path(temp_dir, temp_name, next_best_view)
nbv_cam_info = DataLoadUtil.load_cam_info(nbv_path, binocular=True)
nbv_cam_to_world = nbv_cam_info["cam_to_world_O"]
ControlUtil.move_to(nbv_cam_to_world)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("No view data received")
continue
cam_shot_pts = ViewUtil.get_pts(view_data)
world_shot_pts = PtsUtil.transform_point_cloud(
cam_shot_pts, first_cam_to_real_world
)
_, world_splitted_shot_pts = self.split_scan_pts_and_obj_pts(
world_shot_pts
)
shot_pts_list.append(world_splitted_shot_pts)
debug_dir = os.path.join(temp_dir, "debug")
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
np.savetxt(os.path.join(debug_dir, f"shot_pts_{cnt}.txt"), world_splitted_shot_pts)
np.savetxt(os.path.join(debug_dir, f"render_pts_{cnt}.txt"), sample_view_pts_list[next_best_view])
#real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
#import ipdb; ipdb.set_trace()
last_scanned_pts_num = scanned_pts.shape[0]
new_scanned_pts = PtsUtil.voxel_downsample_point_cloud(
np.vstack([scanned_pts, world_splitted_shot_pts]), self.voxel_size
)
new_scanned_pts_num = new_scanned_pts.shape[0]
history_indices.append(scan_points_idx_list[next_best_view])
scanned_pts = new_scanned_pts
Log.info(
f"Next Best cover pts: {next_best_covered_num}, Best coverage: {next_best_coverage}"
)
coverage_rate_increase = next_best_coverage - last_coverage
if coverage_rate_increase < self.min_coverage_increase:
Log.info(f"Coverage rate = {coverage_rate_increase} < {self.min_coverage_increase}, stop scanning")
# break
last_coverage = next_best_coverage
new_added_pts_num = new_scanned_pts_num - last_scanned_pts_num
if new_added_pts_num < self.min_shot_new_pts_num:
Log.info(f"New added pts num = {new_added_pts_num} < {self.min_shot_new_pts_num}")
#ipdb.set_trace()
if len(shot_pts_list) >= self.max_shot_view_num:
Log.info(f"Scanned view num = {len(shot_pts_list)} >= {self.max_shot_view_num}, stop scanning")
#break
cnt += 1
Log.success("[Part 4/4] finish close-loop control")
def run(self):
total = len(os.listdir(self.model_dir))
model_start_idx = self.generate_config["model_start_idx"]
count_object = model_start_idx
for model_name in os.listdir(self.model_dir[model_start_idx:]):
Log.info(f"[{count_object}/{total}]Processing {model_name}")
self.run_one_model(model_name)
Log.success(f"[{count_object}/{total}]Finished processing {model_name}")
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"C:\Users\hofee\Downloads\mesh.obj"
model = trimesh.load(model_path)

View File

@@ -0,0 +1,224 @@
import os
import time
import trimesh
import tempfile
import subprocess
import numpy as np
from PytorchBoot.runners.runner import Runner
from PytorchBoot.config import ConfigManager
import PytorchBoot.stereotype as stereotype
from PytorchBoot.utils.log_util import Log
from PytorchBoot.status import status_manager
from utils.control_util import ControlUtil
from utils.communicate_util import CommunicateUtil
from utils.pts_util import PtsUtil
from utils.reconstruction_util import ReconstructionUtil
from utils.preprocess_util import save_scene_data, save_scene_data_multithread
from utils.data_load import DataLoadUtil
from utils.view_util import ViewUtil
@stereotype.runner("CAD_open_loop_strategy_runner")
class CADOpenLoopStrategyRunner(Runner):
def __init__(self, config_path: str):
super().__init__(config_path)
self.load_experiment("cad_open_loop_strategy")
self.status_info = {
"status_manager": status_manager,
"app_name": "cad",
"runner_name": "CAD_open_loop_strategy_runner"
}
self.generate_config = ConfigManager.get("runner", "generate")
self.reconstruct_config = ConfigManager.get("runner", "reconstruct")
self.blender_bin_path = self.generate_config["blender_bin_path"]
self.generator_script_path = self.generate_config["generator_script_path"]
self.model_dir = self.generate_config["model_dir"]
self.voxel_size = self.generate_config["voxel_size"]
self.max_view = self.generate_config["max_view"]
self.min_view = self.generate_config["min_view"]
self.max_diag = self.generate_config["max_diag"]
self.min_diag = self.generate_config["min_diag"]
self.min_cam_table_included_degree = self.generate_config["min_cam_table_included_degree"]
self.random_view_ratio = self.generate_config["random_view_ratio"]
self.soft_overlap_threshold = self.reconstruct_config["soft_overlap_threshold"]
self.hard_overlap_threshold = self.reconstruct_config["hard_overlap_threshold"]
self.scan_points_threshold = self.reconstruct_config["scan_points_threshold"]
def create_experiment(self, backup_name=None):
super().create_experiment(backup_name)
def load_experiment(self, backup_name=None):
super().load_experiment(backup_name)
def split_scan_pts_and_obj_pts(self, world_pts, z_threshold = 0):
scan_pts = world_pts[world_pts[:,2] < z_threshold]
obj_pts = world_pts[world_pts[:,2] >= z_threshold]
return scan_pts, obj_pts
def run_one_model(self, model_name):
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
result = dict()
shot_pts_list = []
ControlUtil.connect_robot()
''' init robot '''
Log.info("[Part 1/5] start init and register")
ControlUtil.init()
''' load CAD model '''
model_path = os.path.join(self.model_dir, model_name,"mesh.ply")
temp_name = "cad_model_world"
cad_model = trimesh.load(model_path)
''' take first view '''
Log.info("[Part 1/5] take first view data")
view_data = CommunicateUtil.get_view_data(init=True)
first_cam_pts = ViewUtil.get_pts(view_data)
first_cam_to_real_world = ControlUtil.get_pose()
first_real_world_pts = PtsUtil.transform_point_cloud(first_cam_pts, first_cam_to_real_world)
_, first_splitted_real_world_pts = self.split_scan_pts_and_obj_pts(first_real_world_pts)
np.savetxt(f"first_real_pts_{model_name}.txt", first_splitted_real_world_pts)
''' register '''
Log.info("[Part 1/5] do registeration")
real_world_to_cad = PtsUtil.register(first_splitted_real_world_pts, cad_model)
cad_to_real_world = np.linalg.inv(real_world_to_cad)
Log.success("[Part 1/5] finish init and register")
real_world_to_blender_world = np.eye(4)
real_world_to_blender_world[:3, 3] = np.asarray([0, 0, 0.9215])
cad_model_real_world:trimesh.Trimesh = cad_model.apply_transform(cad_to_real_world)
cad_model_real_world.export(os.path.join(temp_dir, f"real_world_{temp_name}.obj"))
cad_model_blender_world:trimesh.Trimesh = cad_model.apply_transform(real_world_to_blender_world)
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = "/home/yan20/nbv_rec/project/franka_control/temp_output"
cad_model_blender_world.export(os.path.join(temp_dir, f"{temp_name}.obj"))
scene_dir = os.path.join(temp_dir, temp_name)
''' sample view '''
Log.info("[Part 2/5] start running renderer")
subprocess.run([
self.blender_bin_path, '-b', '-P', self.generator_script_path, '--', temp_dir
], capture_output=True, text=True)
Log.success("[Part 2/5] finish running renderer")
world_model_points = np.loadtxt(os.path.join(scene_dir, "points_and_normals.txt"))[:,:3]
''' preprocess '''
Log.info("[Part 3/5] start preprocessing data")
save_scene_data(temp_dir, temp_name)
Log.success("[Part 3/5] finish preprocessing data")
pts_dir = os.path.join(temp_dir,temp_name,"pts")
sample_view_pts_list = []
scan_points_idx_list = []
frame_num = len(os.listdir(pts_dir))
for frame_idx in range(frame_num):
pts_path = os.path.join(temp_dir,temp_name, "pts", f"{frame_idx}.txt")
idx_path = os.path.join(temp_dir,temp_name, "scan_points_indices", f"{frame_idx}.npy")
point_cloud = np.loadtxt(pts_path)
if point_cloud.shape[0] != 0:
sampled_point_cloud = PtsUtil.voxel_downsample_point_cloud(point_cloud, self.voxel_size)
indices = np.load(idx_path)
try:
len(indices)
except:
indices = np.array([indices])
sample_view_pts_list.append(sampled_point_cloud)
scan_points_idx_list.append(indices)
''' generate strategy '''
Log.info("[Part 4/5] start generating strategy")
limited_useful_view, _, _ = ReconstructionUtil.compute_next_best_view_sequence_with_overlap(
world_model_points, sample_view_pts_list,
scan_points_indices_list = scan_points_idx_list,
init_view=0,
threshold=self.voxel_size,
soft_overlap_threshold = self.soft_overlap_threshold,
hard_overlap_threshold = self.hard_overlap_threshold,
scan_points_threshold = self.scan_points_threshold,
status_info=self.status_info
)
Log.success("[Part 4/5] finish generating strategy")
''' extract cam_to_world sequence '''
cam_to_world_seq = []
coveraget_rate_seq = []
render_pts = []
idx_seq = []
for idx, coverage_rate in limited_useful_view:
path = DataLoadUtil.get_path(temp_dir, temp_name, idx)
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
cam_to_world_seq.append(cam_info["cam_to_world_O"])
coveraget_rate_seq.append(coverage_rate)
idx_seq.append(idx)
render_pts.append(sample_view_pts_list[idx])
Log.info("[Part 5/5] start running robot")
''' take best seq view '''
#import ipdb; ipdb.set_trace()
target_scanned_pts = np.concatenate(sample_view_pts_list)
voxel_downsampled_target_scanned_pts = PtsUtil.voxel_downsample_point_cloud(target_scanned_pts, self.voxel_size)
result = dict()
gt_scanned_pts = np.concatenate(render_pts, axis=0)
voxel_down_sampled_gt_scanned_pts = PtsUtil.voxel_downsample_point_cloud(gt_scanned_pts, self.voxel_size)
result["gt_final_coverage_rate_cad"] = ReconstructionUtil.compute_coverage_rate(voxel_downsampled_target_scanned_pts, voxel_down_sampled_gt_scanned_pts, self.voxel_size)
step = 1
result["real_coverage_rate_seq"] = []
for cam_to_world in cam_to_world_seq:
try:
ControlUtil.move_to(cam_to_world)
''' get world pts '''
time.sleep(0.5)
view_data = CommunicateUtil.get_view_data()
if view_data is None:
Log.error("Failed to get view data")
continue
cam_pts = ViewUtil.get_pts(view_data)
shot_pts_list.append(cam_pts)
scanned_pts = np.concatenate(shot_pts_list, axis=0)
voxel_down_sampled_scanned_pts = PtsUtil.voxel_downsample_point_cloud(scanned_pts, self.voxel_size)
voxel_down_sampled_scanned_pts_world = PtsUtil.transform_point_cloud(voxel_down_sampled_scanned_pts, first_cam_to_real_world)
curr_CR = ReconstructionUtil.compute_coverage_rate(voxel_downsampled_target_scanned_pts, voxel_down_sampled_scanned_pts_world, self.voxel_size)
Log.success(f"(step {step}/{len(cam_to_world_seq)}) current coverage: {curr_CR} | gt coverage: {result['gt_final_coverage_rate_cad']}")
result["real_final_coverage_rate"] = curr_CR
result["real_coverage_rate_seq"].append(curr_CR)
step += 1
except Exception as e:
Log.error(f"Failed to move to {cam_to_world}")
Log.error(e)
#import ipdb;ipdb.set_trace()
for idx in range(len(shot_pts_list)):
if not os.path.exists(os.path.join(temp_dir, temp_name, "shot_pts")):
os.makedirs(os.path.join(temp_dir, temp_name, "shot_pts"))
if not os.path.exists(os.path.join(temp_dir, temp_name, "render_pts")):
os.makedirs(os.path.join(temp_dir, temp_name, "render_pts"))
shot_pts = PtsUtil.transform_point_cloud(shot_pts_list[idx], first_cam_to_real_world)
np.savetxt(os.path.join(temp_dir, temp_name, "shot_pts", f"{idx}.txt"), shot_pts)
np.savetxt(os.path.join(temp_dir, temp_name, "render_pts", f"{idx}.txt"), render_pts[idx])
Log.success("[Part 5/5] finish running robot")
Log.debug(result)
def run(self):
total = len(os.listdir(self.model_dir))
model_start_idx = self.generate_config["model_start_idx"]
count_object = model_start_idx
for model_name in os.listdir(self.model_dir[model_start_idx:]):
Log.info(f"[{count_object}/{total}]Processing {model_name}")
self.run_one_model(model_name)
Log.success(f"[{count_object}/{total}]Finished processing {model_name}")
# ---------------------------- test ---------------------------- #
if __name__ == "__main__":
model_path = r"C:\Users\hofee\Downloads\mesh.obj"
model = trimesh.load(model_path)

0
runners/inference.py Normal file
View File

35
utils/communicate_util.py Normal file
View File

@@ -0,0 +1,35 @@
import requests
import numpy as np
class CommunicateUtil:
VIEW_HOST = "192.168.1.2:7999" #"10.7.250.52:7999" ##
INFERENCE_HOST = "127.0.0.1"
INFERENCE_PORT = 5000
def get_view_data(init = False) -> dict:
params = {}
params["create_scanner"] = init
response = requests.get(f"http://{CommunicateUtil.VIEW_HOST}/api/data", json=params)
data = response.json()
if not data["success"]:
print(f"Failed to get view data")
return None
image_id = data["image_id"]
depth_image = np.array(data["depth_image"], dtype=np.uint16)
depth_intrinsics = data["depth_intrinsics"]
depth_extrinsics = np.array(data["depth_extrinsics"])
view_data = {
"image_id": image_id,
"depth_image": depth_image,
"depth_intrinsics": depth_intrinsics,
"depth_extrinsics": depth_extrinsics
}
return view_data
def get_inference_data(view_data:dict) -> dict:
data = {}
return data

View File

@@ -1,39 +1,227 @@
import numpy as np
from frankapy import FrankaArm
from autolab_core import RigidTransform
import serial
import time
class ControlUtil:
__fa = FrankaArm(robot_num=2)
__fa:FrankaArm = None
__ser: serial.Serial = None
curr_rotation = 0
DISPLAYTABLE_TO_BASE:np.ndarray = np.asarray([
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
BASE_TO_WORLD:np.ndarray = np.asarray([
[1, 0, 0, -0.61091665],
[0, 1, 0, -0.00309726],
[0, 0, 1, -0.1136743],
[0, 0, 0, 1]
])
GRIPPER_TO_CAMERA:np.ndarray = np.asarray([
CAMERA_TO_GRIPPER:np.ndarray = np.asarray([
[0, -1, 0, 0.01],
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 1, 0.08],
[0, 0, 0, 1]
])
INIT_GRIPPER_POSE:np.ndarray = np.asarray([
[ 0.46532393, 0.62171798, 0.63002284, 0.21230963],
[ 0.43205618, -0.78075723, 0.45136491, -0.25127173],
[ 0.77251656, 0.06217437, -0.63193429, 0.499957 ],
[ 0. , 0. , 0. , 1. ],
])
@staticmethod
def get_franka_arm() -> FrankaArm:
return ControlUtil.__fa
def connect_robot():
if ControlUtil.__fa is None:
ControlUtil.__fa = FrankaArm(robot_num=2)
if ControlUtil.__ser is None:
ControlUtil.__ser = serial.Serial(port="/dev/ttyUSB0", baudrate=115200)
@staticmethod
def franka_reset() -> None:
ControlUtil.__fa.reset_joints()
@staticmethod
def init() -> None:
ControlUtil.franka_reset()
ControlUtil.set_gripper_pose(ControlUtil.INIT_GRIPPER_POSE)
@staticmethod
def get_pose() -> np.ndarray:
gripper_to_base = ControlUtil.get_curr_gripper_to_base_pose()
cam_to_world = ControlUtil.BASE_TO_WORLD @ gripper_to_base @ ControlUtil.CAMERA_TO_GRIPPER
return cam_to_world
@staticmethod
def set_pose(cam_to_world: np.ndarray) -> None:
gripper_to_base = ControlUtil.solve_gripper_to_base(cam_to_world)
ControlUtil.set_gripper_pose(gripper_to_base)
@staticmethod
def rotate_display_table(degree):
turn_directions = {
"left": 1,
"right": 0
}
delta_degree = degree - ControlUtil.curr_rotation
ControlUtil.curr_rotation += delta_degree
print(f"Table rotated {ControlUtil.cnt_rotation} degree")
if degree >= 0:
turn_angle = delta_degree
turn_direction = turn_directions["right"]
else:
turn_angle = -delta_degree
turn_direction = turn_directions["left"]
write_len = ControlUtil.__ser.write(f"CT+TRUNSINGLE({turn_direction},{turn_angle});".encode('utf-8'))
@staticmethod
def get_curr_gripper_to_base_pose() -> np.ndarray:
return ControlUtil.__fa.get_pose().matrix
@staticmethod
def reset() -> None:
ControlUtil.__fa.reset_joints()
def set_gripper_pose(gripper_to_base: np.ndarray) -> None:
gripper_to_base = RigidTransform(rotation=gripper_to_base[:3, :3], translation=gripper_to_base[:3, 3], from_frame="franka_tool", to_frame="world")
ControlUtil.__fa.goto_pose(gripper_to_base, duration=5, use_impedance=False, ignore_errors=False)
@staticmethod
def solve_gripper_to_base(cam_to_world: np.ndarray) -> np.ndarray:
return np.linalg.inv(ControlUtil.BASE_TO_WORLD) @ cam_to_world @ np.linalg.inv(ControlUtil.CAMERA_TO_GRIPPER)
@staticmethod
def sovle_cam_to_world(gripper_to_base: np.ndarray) -> np.ndarray:
return ControlUtil.BASE_TO_WORLD @ gripper_to_base @ ControlUtil.CAMERA_TO_GRIPPER
@staticmethod
def check_limit(new_cam_to_world):
if new_cam_to_world[0,3] > 0 or new_cam_to_world[1,3] > 0:
return False
x = abs(new_cam_to_world[0,3])
y = abs(new_cam_to_world[1,3])
tan_y_x = y/x
if tan_y_x < np.sqrt(3)/3 or tan_y_x > np.sqrt(3):
return False
return True
@staticmethod
def solve_display_table_rot_and_cam_to_world(cam_to_world: np.ndarray) -> tuple:
if ControlUtil.check_limit(cam_to_world):
return 0, cam_to_world
else:
min_display_table_rot = 180
min_new_cam_to_world = None
for display_table_rot in np.linspace(0.1,360, 1800):
display_table_rot_z_pose = ControlUtil.get_z_axis_rot_mat(display_table_rot)
new_cam_to_world = np.linalg.inv(display_table_rot_z_pose) @ cam_to_world
if ControlUtil.check_limit(new_cam_to_world):
if display_table_rot < min_display_table_rot:
min_display_table_rot, min_new_cam_to_world = display_table_rot, new_cam_to_world
if abs(display_table_rot - 360) < min_display_table_rot:
min_display_table_rot, min_new_cam_to_world = display_table_rot - 360, new_cam_to_world
if min_new_cam_to_world is None:
raise ValueError("No valid display table rotation found")
return min_display_table_rot, min_new_cam_to_world
@staticmethod
def get_z_axis_rot_mat(degree):
radian = np.radians(degree)
return np.array([
[np.cos(radian), -np.sin(radian), 0, 0],
[np.sin(radian), np.cos(radian), 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]
])
@staticmethod
def get_gripper_to_base_axis_angle(gripper_to_base: np.ndarray) -> bool:
rot_mat = gripper_to_base[:3,:3]
gripper_z_axis = rot_mat[:,2]
base_x_axis = np.array([1,0,0])
angle = np.arccos(np.dot(gripper_z_axis, base_x_axis))
return angle
@staticmethod
def move_to(pose: np.ndarray):
rot_degree, cam_to_world = ControlUtil.solve_display_table_rot_and_cam_to_world(pose)
exec_time = abs(rot_degree)/9
start_time = time.time()
ControlUtil.rotate_display_table(rot_degree)
ControlUtil.set_pose(cam_to_world)
end_time = time.time()
print(f"Move to pose with rotation {rot_degree} degree, exec time: {end_time - start_time}|exec time: {exec_time}")
if end_time - start_time < exec_time:
time.sleep(exec_time - (end_time - start_time))
# ----------- Debug Test -------------
if __name__ == "__main__":
print(ControlUtil.get_pose())
ControlUtil.reset()
print(ControlUtil.get_pose())
ControlUtil.connect_robot()
# ControlUtil.franka_reset()
def main_test():
print(ControlUtil.get_curr_gripper_to_base_pose())
ControlUtil.init()
def rotate_back(rotation):
ControlUtil.rotate_display_table(-rotation)
#main_test()
import sys; sys.path.append("/home/yan20/nbv_rec/project/franka_control")
from utils.communicate_util import CommunicateUtil
import ipdb
ControlUtil.init()
view_data_0 = CommunicateUtil.get_view_data(init=True)
depth_extrinsics_0 = view_data_0["depth_extrinsics"]
cam_to_world_0 = ControlUtil.get_pose()
print("cam_extrinsics_0")
print(depth_extrinsics_0)
print("cam_to_world_0")
print(cam_to_world_0)
ipdb.set_trace()
TEST_POSE:np.ndarray = np.asarray([
[ 0.46532393, 0.62171798, 0.63002284, 0.30230963],
[ 0.43205618, -0.78075723, 0.45136491, -0.29127173],
[ 0.77251656, 0.06217437, -0.63193429, 0.559957 ],
[ 0. , 0. , 0. , 1. ],
])
TEST_POSE_CAM_TO_WORLD = ControlUtil.BASE_TO_WORLD @ TEST_POSE @ ControlUtil.CAMERA_TO_GRIPPER
ControlUtil.move_to(TEST_POSE_CAM_TO_WORLD)
view_data_1 = CommunicateUtil.get_view_data()
depth_extrinsics_1 = view_data_1["depth_extrinsics"]
depth_extrinsics_1[:3,3] = depth_extrinsics_1[:3,3] / 1000
cam_to_world_1 = ControlUtil.get_pose()
print("cam_extrinsics_1")
print(depth_extrinsics_1)
print("cam_to_world_1")
print(TEST_POSE_CAM_TO_WORLD)
actual_cam_to_world_1 = cam_to_world_0 @ depth_extrinsics_1
print("actual_cam_to_world_1")
print(actual_cam_to_world_1)
ipdb.set_trace()
TEST_POSE_2:np.ndarray = np.asarray(
[[ 0.74398544, -0.61922696, 0.251049, 0.47000935],
[-0.47287207, -0.75338888, -0.45692666, 0.20843903],
[ 0.47207883 , 0.22123272, -0.85334192, 0.57863381],
[ 0. , 0. , 0. , 1. , ]]
)
TEST_POSE_CAM_TO_WORLD_2 = ControlUtil.BASE_TO_WORLD @ TEST_POSE_2 @ ControlUtil.CAMERA_TO_GRIPPER
#ControlUtil.move_to(TEST_POSE_CAM_TO_WORLD_2)
ControlUtil.set_pose(TEST_POSE_CAM_TO_WORLD_2)
view_data_2 = CommunicateUtil.get_view_data()
depth_extrinsics_2 = view_data_2["depth_extrinsics"]
depth_extrinsics_2[:3,3] = depth_extrinsics_2[:3,3] / 1000
cam_to_world_2 = ControlUtil.get_pose()
print("cam_extrinsics_2")
print(depth_extrinsics_2)
print("cam_to_world_2")
print(TEST_POSE_CAM_TO_WORLD_2)
actual_cam_to_world_2 = cam_to_world_0 @ depth_extrinsics_2
print("actual_cam_to_world_2")
print(actual_cam_to_world_2)
ipdb.set_trace()

410
utils/data_load.py Normal file
View File

@@ -0,0 +1,410 @@
import os
import numpy as np
import json
import cv2
import trimesh
import torch
from utils.pts_util import PtsUtil
class DataLoadUtil:
TABLE_POSITION = np.asarray([0, 0, 0.8215])
@staticmethod
def get_display_table_info(root, scene_name):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
display_table_info = scene_info["display_table"]
return display_table_info
@staticmethod
def get_display_table_top(root, scene_name):
display_table_height = DataLoadUtil.get_display_table_info(root, scene_name)[
"height"
]
display_table_top = DataLoadUtil.TABLE_POSITION + np.asarray(
[0, 0, display_table_height]
)
return display_table_top
@staticmethod
def get_path(root, scene_name, frame_idx):
path = os.path.join(root, scene_name, f"{frame_idx}")
return path
@staticmethod
def get_label_num(root, scene_name):
label_dir = os.path.join(root, scene_name, "label")
return len(os.listdir(label_dir))
@staticmethod
def get_label_path(root, scene_name, seq_idx):
label_dir = os.path.join(root, scene_name, "label")
if not os.path.exists(label_dir):
os.makedirs(label_dir)
path = os.path.join(label_dir, f"{seq_idx}.json")
return path
@staticmethod
def get_label_path_old(root, scene_name):
path = os.path.join(root, scene_name, "label.json")
return path
@staticmethod
def get_scene_seq_length(root, scene_name):
camera_params_path = os.path.join(root, scene_name, "camera_params")
return len(os.listdir(camera_params_path))
@staticmethod
def load_mesh_at(model_dir, object_name, world_object_pose):
model_path = os.path.join(model_dir, object_name, "mesh.obj")
mesh = trimesh.load(model_path)
mesh.apply_transform(world_object_pose)
return mesh
@staticmethod
def get_bbox_diag(model_dir, object_name):
model_path = os.path.join(model_dir, object_name, "mesh.obj")
mesh = trimesh.load(model_path)
bbox = mesh.bounding_box.extents
diagonal_length = np.linalg.norm(bbox)
return diagonal_length
@staticmethod
def save_mesh_at(model_dir, output_dir, object_name, scene_name, world_object_pose):
mesh = DataLoadUtil.load_mesh_at(model_dir, object_name, world_object_pose)
model_path = os.path.join(output_dir, scene_name, "world_mesh.obj")
mesh.export(model_path)
@staticmethod
def save_target_mesh_at_world_space(
root, model_dir, scene_name, display_table_as_world_space_origin=True
):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
target_name = scene_info["target_name"]
transformation = scene_info[target_name]
if display_table_as_world_space_origin:
location = transformation["location"] - DataLoadUtil.get_display_table_top(
root, scene_name
)
else:
location = transformation["location"]
rotation_euler = transformation["rotation_euler"]
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
pose_mat[:3, 3] = location
mesh = DataLoadUtil.load_mesh_at(model_dir, target_name, pose_mat)
mesh_dir = os.path.join(root, scene_name, "mesh")
if not os.path.exists(mesh_dir):
os.makedirs(mesh_dir)
model_path = os.path.join(mesh_dir, "world_target_mesh.obj")
mesh.export(model_path)
@staticmethod
def load_scene_info(root, scene_name):
scene_info_path = os.path.join(root, scene_name, "scene_info.json")
with open(scene_info_path, "r") as f:
scene_info = json.load(f)
return scene_info
@staticmethod
def load_target_pts_num_dict(root, scene_name):
target_pts_num_path = os.path.join(root, scene_name, "target_pts_num.json")
with open(target_pts_num_path, "r") as f:
target_pts_num_dict = json.load(f)
return target_pts_num_dict
@staticmethod
def load_target_object_pose(root, scene_name):
scene_info = DataLoadUtil.load_scene_info(root, scene_name)
target_name = scene_info["target_name"]
transformation = scene_info[target_name]
location = transformation["location"]
rotation_euler = transformation["rotation_euler"]
pose_mat = trimesh.transformations.euler_matrix(*rotation_euler)
pose_mat[:3, 3] = location
return pose_mat
@staticmethod
def load_depth(path, min_depth=0.01, max_depth=5.0, binocular=False):
def load_depth_from_real_path(real_path, min_depth, max_depth):
depth = cv2.imread(real_path, cv2.IMREAD_UNCHANGED)
depth = depth.astype(np.float32) / 65535.0
min_depth = min_depth
max_depth = max_depth
depth_meters = min_depth + (max_depth - min_depth) * depth
return depth_meters
if binocular:
depth_path_L = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + "_L.png"
)
depth_path_R = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + "_R.png"
)
depth_meters_L = load_depth_from_real_path(
depth_path_L, min_depth, max_depth
)
depth_meters_R = load_depth_from_real_path(
depth_path_R, min_depth, max_depth
)
return depth_meters_L, depth_meters_R
else:
depth_path = os.path.join(
os.path.dirname(path), "depth", os.path.basename(path) + ".png"
)
depth_meters = load_depth_from_real_path(depth_path, min_depth, max_depth)
return depth_meters
@staticmethod
def load_seg(path, binocular=False, left_only=False):
if binocular and not left_only:
def clean_mask(mask_image):
green = [0, 255, 0, 255]
red = [255, 0, 0, 255]
threshold = 2
mask_image = np.where(
np.abs(mask_image - green) <= threshold, green, mask_image
)
mask_image = np.where(
np.abs(mask_image - red) <= threshold, red, mask_image
)
return mask_image
mask_path_L = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
)
mask_image_L = clean_mask(cv2.imread(mask_path_L, cv2.IMREAD_UNCHANGED))
mask_path_R = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_R.png"
)
mask_image_R = clean_mask(cv2.imread(mask_path_R, cv2.IMREAD_UNCHANGED))
return mask_image_L, mask_image_R
else:
if binocular and left_only:
mask_path = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + "_L.png"
)
else:
mask_path = os.path.join(
os.path.dirname(path), "mask", os.path.basename(path) + ".png"
)
mask_image = cv2.imread(mask_path, cv2.IMREAD_UNCHANGED)
return mask_image
@staticmethod
def load_normal(path, binocular=False, left_only=False):
if binocular and not left_only:
normal_path_L = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
normal_image_L = cv2.imread(normal_path_L, cv2.IMREAD_COLOR)
normal_path_R = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_R.png"
)
normal_image_R = cv2.imread(normal_path_R, cv2.IMREAD_COLOR)
normalized_normal_image_L = normal_image_L / 255.0 * 2.0 - 1.0
normalized_normal_image_R = normal_image_R / 255.0 * 2.0 - 1.0
return normalized_normal_image_L, normalized_normal_image_R
else:
if binocular and left_only:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + "_L.png"
)
else:
normal_path = os.path.join(
os.path.dirname(path), "normal", os.path.basename(path) + ".png"
)
normal_image = cv2.imread(normal_path, cv2.IMREAD_COLOR)
normalized_normal_image = normal_image / 255.0 * 2.0 - 1.0
return normalized_normal_image
@staticmethod
def load_label(path):
with open(path, "r") as f:
label_data = json.load(f)
return label_data
@staticmethod
def load_rgb(path):
rgb_path = os.path.join(
os.path.dirname(path), "rgb", os.path.basename(path) + ".png"
)
rgb_image = cv2.imread(rgb_path, cv2.IMREAD_COLOR)
return rgb_image
@staticmethod
def load_from_preprocessed_pts(path):
npy_path = os.path.join(
os.path.dirname(path), "pts", os.path.basename(path) + ".npy"
)
pts = np.load(npy_path)
return pts
@staticmethod
def cam_pose_transformation(cam_pose_before):
offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
cam_pose_after = cam_pose_before @ offset
return cam_pose_after
@staticmethod
def load_cam_info(path, binocular=False, display_table_as_world_space_origin=True):
scene_dir = os.path.dirname(path)
root_dir = os.path.dirname(scene_dir)
scene_name = os.path.basename(scene_dir)
camera_params_path = os.path.join(
os.path.dirname(path), "camera_params", os.path.basename(path) + ".json"
)
with open(camera_params_path, "r") as f:
label_data = json.load(f)
cam_to_world = np.asarray(label_data["extrinsic"])
cam_to_world = DataLoadUtil.cam_pose_transformation(cam_to_world)
world_to_display_table = np.eye(4)
world_to_display_table[:3, 3] = -DataLoadUtil.get_display_table_top(
root_dir, scene_name
)
if display_table_as_world_space_origin:
cam_to_world = np.dot(world_to_display_table, cam_to_world)
cam_intrinsic = np.asarray(label_data["intrinsic"])
cam_info = {
"cam_to_world": cam_to_world,
"cam_intrinsic": cam_intrinsic,
"far_plane": label_data["far_plane"],
"near_plane": label_data["near_plane"],
}
if binocular:
cam_to_world_R = np.asarray(label_data["extrinsic_R"])
cam_to_world_R = DataLoadUtil.cam_pose_transformation(cam_to_world_R)
cam_to_world_O = np.asarray(label_data["extrinsic_cam_object"])
cam_to_world_O = DataLoadUtil.cam_pose_transformation(cam_to_world_O)
if display_table_as_world_space_origin:
cam_to_world_O = np.dot(world_to_display_table, cam_to_world_O)
cam_to_world_R = np.dot(world_to_display_table, cam_to_world_R)
cam_info["cam_to_world_O"] = cam_to_world_O
cam_info["cam_to_world_R"] = cam_to_world_R
return cam_info
@staticmethod
def get_real_cam_O_from_cam_L(
cam_L, cam_O_to_cam_L, scene_path, display_table_as_world_space_origin=True
):
root_dir = os.path.dirname(scene_path)
scene_name = os.path.basename(scene_path)
if isinstance(cam_L, torch.Tensor):
cam_L = cam_L.cpu().numpy()
nO_to_display_table_pose = cam_L @ cam_O_to_cam_L
if display_table_as_world_space_origin:
display_table_to_world = np.eye(4)
display_table_to_world[:3, 3] = DataLoadUtil.get_display_table_top(
root_dir, scene_name
)
nO_to_world_pose = np.dot(display_table_to_world, nO_to_display_table_pose)
nO_to_world_pose = DataLoadUtil.cam_pose_transformation(nO_to_world_pose)
return nO_to_world_pose
@staticmethod
def get_target_point_cloud(
depth, cam_intrinsic, cam_extrinsic, mask, target_mask_label=(0, 255, 0, 255), require_full_points=False
):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1, 4)
target_mask = (mask == target_mask_label).all(axis=-1)
target_points_camera = points_camera[target_mask]
target_points_camera_aug = np.concatenate(
[target_points_camera, np.ones((target_points_camera.shape[0], 1))], axis=-1
)
target_points_world = np.dot(cam_extrinsic, target_points_camera_aug.T).T[:, :3]
data = {
"points_world": target_points_world,
"points_camera": target_points_camera,
}
return data
@staticmethod
def get_point_cloud(depth, cam_intrinsic, cam_extrinsic):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
points_camera_aug = np.concatenate(
[points_camera, np.ones((points_camera.shape[0], 1))], axis=-1
)
points_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3]
return {"points_world": points_world, "points_camera": points_camera}
@staticmethod
def get_target_point_cloud_world_from_path(
path,
binocular=False,
random_downsample_N=65536,
voxel_size=0.005,
target_mask_label=(0, 255, 0, 255),
display_table_mask_label=(0, 0, 255, 255),
get_display_table_pts=False,
require_normal=False,
):
cam_info = DataLoadUtil.load_cam_info(path, binocular=binocular)
if binocular:
depth_L, depth_R = DataLoadUtil.load_depth(
path, cam_info["near_plane"], cam_info["far_plane"], binocular=True
)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
point_cloud_L = DataLoadUtil.get_target_point_cloud(
depth_L,
cam_info["cam_intrinsic"],
cam_info["cam_to_world"],
mask_L,
target_mask_label,
)["points_world"]
point_cloud_R = DataLoadUtil.get_target_point_cloud(
depth_R,
cam_info["cam_intrinsic"],
cam_info["cam_to_world_R"],
mask_R,
target_mask_label,
)["points_world"]
point_cloud_L = PtsUtil.random_downsample_point_cloud(
point_cloud_L, random_downsample_N
)
point_cloud_R = PtsUtil.random_downsample_point_cloud(
point_cloud_R, random_downsample_N
)
overlap_points = PtsUtil.get_overlapping_points(
point_cloud_L, point_cloud_R, voxel_size
)
return overlap_points
else:
depth = DataLoadUtil.load_depth(
path, cam_info["near_plane"], cam_info["far_plane"]
)
mask = DataLoadUtil.load_seg(path)
point_cloud = DataLoadUtil.get_target_point_cloud(
depth, cam_info["cam_intrinsic"], cam_info["cam_to_world"], mask
)["points_world"]
return point_cloud
@staticmethod
def load_points_normals(root, scene_name, display_table_as_world_space_origin=True):
points_path = os.path.join(root, scene_name, "points_and_normals.txt")
points_normals = np.loadtxt(points_path)
if display_table_as_world_space_origin:
points_normals[:, :3] = points_normals[
:, :3
] - DataLoadUtil.get_display_table_top(root, scene_name)
return points_normals

151
utils/pose_util.py Normal file
View File

@@ -0,0 +1,151 @@
import numpy as np
class PoseUtil:
ROTATION = 1
TRANSLATION = 2
SCALE = 3
@staticmethod
def get_uniform_translation(trans_m_min, trans_m_max, trans_unit, debug=False):
if isinstance(trans_m_min, list):
x_min, y_min, z_min = trans_m_min
x_max, y_max, z_max = trans_m_max
else:
x_min, y_min, z_min = trans_m_min, trans_m_min, trans_m_min
x_max, y_max, z_max = trans_m_max, trans_m_max, trans_m_max
x = np.random.uniform(x_min, x_max)
y = np.random.uniform(y_min, y_max)
z = np.random.uniform(z_min, z_max)
translation = np.array([x, y, z])
if trans_unit == "cm":
translation = translation / 100
if debug:
print("uniform translation:", translation)
return translation
@staticmethod
def get_uniform_rotation(rot_degree_min=0, rot_degree_max=180, debug=False):
axis = np.random.randn(3)
axis /= np.linalg.norm(axis)
theta = np.random.uniform(
rot_degree_min / 180 * np.pi, rot_degree_max / 180 * np.pi
)
K = np.array(
[[0, -axis[2], axis[1]], [axis[2], 0, -axis[0]], [-axis[1], axis[0], 0]]
)
R = np.eye(3) + np.sin(theta) * K + (1 - np.cos(theta)) * (K @ K)
if debug:
print("uniform rotation:", theta * 180 / np.pi)
return R
@staticmethod
def get_uniform_pose(
trans_min, trans_max, rot_min=0, rot_max=180, trans_unit="cm", debug=False
):
translation = PoseUtil.get_uniform_translation(
trans_min, trans_max, trans_unit, debug
)
rotation = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug)
pose = np.eye(4)
pose[:3, :3] = rotation
pose[:3, 3] = translation
return pose
@staticmethod
def get_n_uniform_pose(
trans_min,
trans_max,
rot_min=0,
rot_max=180,
n=1,
trans_unit="cm",
fix=None,
contain_canonical=True,
debug=False,
):
if fix == PoseUtil.ROTATION:
translations = np.zeros((n, 3))
for i in range(n):
translations[i] = PoseUtil.get_uniform_translation(
trans_min, trans_max, trans_unit, debug
)
if contain_canonical:
translations[0] = np.zeros(3)
rotations = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug)
elif fix == PoseUtil.TRANSLATION:
rotations = np.zeros((n, 3, 3))
for i in range(n):
rotations[i] = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug)
if contain_canonical:
rotations[0] = np.eye(3)
translations = PoseUtil.get_uniform_translation(
trans_min, trans_max, trans_unit, debug
)
else:
translations = np.zeros((n, 3))
rotations = np.zeros((n, 3, 3))
for i in range(n):
translations[i] = PoseUtil.get_uniform_translation(
trans_min, trans_max, trans_unit, debug
)
for i in range(n):
rotations[i] = PoseUtil.get_uniform_rotation(rot_min, rot_max, debug)
if contain_canonical:
translations[0] = np.zeros(3)
rotations[0] = np.eye(3)
pose = np.eye(4, 4, k=0)[np.newaxis, :].repeat(n, axis=0)
pose[:, :3, :3] = rotations
pose[:, :3, 3] = translations
return pose
@staticmethod
def get_n_uniform_pose_batch(
trans_min,
trans_max,
rot_min=0,
rot_max=180,
n=1,
batch_size=1,
trans_unit="cm",
fix=None,
contain_canonical=False,
debug=False,
):
batch_poses = []
for i in range(batch_size):
pose = PoseUtil.get_n_uniform_pose(
trans_min,
trans_max,
rot_min,
rot_max,
n,
trans_unit,
fix,
contain_canonical,
debug,
)
batch_poses.append(pose)
pose_batch = np.stack(batch_poses, axis=0)
return pose_batch
@staticmethod
def get_uniform_scale(scale_min, scale_max, debug=False):
if isinstance(scale_min, list):
x_min, y_min, z_min = scale_min
x_max, y_max, z_max = scale_max
else:
x_min, y_min, z_min = scale_min, scale_min, scale_min
x_max, y_max, z_max = scale_max, scale_max, scale_max
x = np.random.uniform(x_min, x_max)
y = np.random.uniform(y_min, y_max)
z = np.random.uniform(z_min, z_max)
scale = np.array([x, y, z])
if debug:
print("uniform scale:", scale)
return scale

222
utils/preprocess_util.py Normal file
View File

@@ -0,0 +1,222 @@
import os
import numpy as np
import time
import sys
np.random.seed(0)
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils.reconstruction_util import ReconstructionUtil
from utils.data_load import DataLoadUtil
from utils.pts_util import PtsUtil
from PytorchBoot.utils.log_util import Log
def save_np_pts(path, pts: np.ndarray, file_type="txt"):
if file_type == "txt":
np.savetxt(path, pts)
else:
np.save(path, pts)
def save_target_points(root, scene, frame_idx, target_points: np.ndarray, file_type="txt"):
pts_path = os.path.join(root,scene, "pts", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "pts")):
os.makedirs(os.path.join(root,scene, "pts"))
save_np_pts(pts_path, target_points, file_type)
def save_scan_points_indices(root, scene, frame_idx, scan_points_indices: np.ndarray, file_type="txt"):
file_type="npy"
indices_path = os.path.join(root,scene, "scan_points_indices", f"{frame_idx}.{file_type}")
if not os.path.exists(os.path.join(root,scene, "scan_points_indices")):
os.makedirs(os.path.join(root,scene, "scan_points_indices"))
save_np_pts(indices_path, scan_points_indices, file_type)
def save_scan_points(root, scene, scan_points: np.ndarray):
scan_points_path = os.path.join(root,scene, "scan_points.txt")
save_np_pts(scan_points_path, scan_points)
def get_world_points(depth, mask, cam_intrinsic, cam_extrinsic, random_downsample_N):
z = depth[mask]
i, j = np.nonzero(mask)
x = (j - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (i - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
sampled_target_points = PtsUtil.random_downsample_point_cloud(
points_camera, random_downsample_N
)
points_camera_aug = np.concatenate((sampled_target_points, np.ones((sampled_target_points.shape[0], 1))), axis=-1)
points_camera_world = np.dot(cam_extrinsic, points_camera_aug.T).T[:, :3]
return points_camera_world
def get_scan_points_indices(scan_points, mask, display_table_mask_label, cam_intrinsic, cam_extrinsic):
scan_points_homogeneous = np.hstack((scan_points, np.ones((scan_points.shape[0], 1))))
points_camera = np.dot(np.linalg.inv(cam_extrinsic), scan_points_homogeneous.T).T[:, :3]
points_image_homogeneous = np.dot(cam_intrinsic, points_camera.T).T
points_image_homogeneous /= points_image_homogeneous[:, 2:]
pixel_x = points_image_homogeneous[:, 0].astype(int)
pixel_y = points_image_homogeneous[:, 1].astype(int)
h, w = mask.shape[:2]
valid_indices = (pixel_x >= 0) & (pixel_x < w) & (pixel_y >= 0) & (pixel_y < h)
mask_colors = mask[pixel_y[valid_indices], pixel_x[valid_indices]]
selected_points_indices = np.where((mask_colors == display_table_mask_label).all(axis=-1))[0]
selected_points_indices = np.where(valid_indices)[0][selected_points_indices]
return selected_points_indices
def save_scene_data(root, scene, file_type="txt"):
''' configuration '''
target_mask_label = (0, 255, 0, 255)
display_table_mask_label=(0, 0, 255, 255)
random_downsample_N = 32768
voxel_size=0.002
filter_degree = 75
min_z = 0.25
max_z = 0.5
''' scan points '''
display_table_info = DataLoadUtil.get_display_table_info(root, scene)
radius = display_table_info["radius"]
scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0,display_table_radius=radius))
''' read frame data(depth|mask|normal) '''
frame_num = DataLoadUtil.get_scene_seq_length(root, scene)
for frame_id in range(frame_num):
Log.info(f"frame({frame_id}/{frame_num})]Processing {scene} frame {frame_id}")
path = DataLoadUtil.get_path(root, scene, frame_id)
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
depth_L, depth_R = DataLoadUtil.load_depth(
path, cam_info["near_plane"],
cam_info["far_plane"],
binocular=True
)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
''' target points '''
mask_img_L = mask_L
mask_img_R = mask_R
target_mask_img_L = (mask_L == target_mask_label).all(axis=-1)
target_mask_img_R = (mask_R == target_mask_label).all(axis=-1)
sampled_target_points_L = get_world_points(depth_L, target_mask_img_L, cam_info["cam_intrinsic"], cam_info["cam_to_world"], random_downsample_N)
sampled_target_points_R = get_world_points(depth_R, target_mask_img_R, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"], random_downsample_N)
has_points = sampled_target_points_L.shape[0] > 0 and sampled_target_points_R.shape[0] > 0
if has_points:
target_points = PtsUtil.get_overlapping_points(
sampled_target_points_L, sampled_target_points_R, voxel_size
)
if has_points:
has_points = target_points.shape[0] > 0
if has_points:
points_normals = DataLoadUtil.load_points_normals(root, scene, display_table_as_world_space_origin=True)
target_points = PtsUtil.filter_points(
target_points, points_normals, cam_info["cam_to_world"],voxel_size=0.002, theta = filter_degree, z_range=(min_z, max_z)
)
''' scan points indices '''
scan_points_indices_L = get_scan_points_indices(scan_points, mask_img_L, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world"])
scan_points_indices_R = get_scan_points_indices(scan_points, mask_img_R, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"])
scan_points_indices = np.intersect1d(scan_points_indices_L, scan_points_indices_R)
if not has_points:
target_points = np.zeros((0, 3))
save_target_points(root, scene, frame_id, target_points, file_type=file_type)
save_scan_points_indices(root, scene, frame_id, scan_points_indices, file_type=file_type)
save_scan_points(root, scene, scan_points) # The "done" flag of scene preprocess
def process_frame(frame_id, root, scene, scan_points, file_type, target_mask_label, display_table_mask_label, random_downsample_N, voxel_size, filter_degree, min_z, max_z):
Log.info(f"[frame({frame_id})]Processing {scene} frame {frame_id}")
path = DataLoadUtil.get_path(root, scene, frame_id)
cam_info = DataLoadUtil.load_cam_info(path, binocular=True)
depth_L, depth_R = DataLoadUtil.load_depth(
path, cam_info["near_plane"],
cam_info["far_plane"],
binocular=True
)
mask_L, mask_R = DataLoadUtil.load_seg(path, binocular=True)
target_mask_img_L = (mask_L == target_mask_label).all(axis=-1)
target_mask_img_R = (mask_R == target_mask_label).all(axis=-1)
sampled_target_points_L = get_world_points(depth_L, target_mask_img_L, cam_info["cam_intrinsic"], cam_info["cam_to_world"], random_downsample_N)
sampled_target_points_R = get_world_points(depth_R, target_mask_img_R, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"], random_downsample_N)
has_points = sampled_target_points_L.shape[0] > 0 and sampled_target_points_R.shape[0] > 0
target_points = np.zeros((0, 3))
if has_points:
target_points = PtsUtil.get_overlapping_points(sampled_target_points_L, sampled_target_points_R, voxel_size)
if has_points and target_points.shape[0] > 0:
points_normals = DataLoadUtil.load_points_normals(root, scene, display_table_as_world_space_origin=True)
target_points = PtsUtil.filter_points(
target_points, points_normals, cam_info["cam_to_world"], voxel_size=0.002, theta=filter_degree, z_range=(min_z, max_z)
)
scan_points_indices_L = get_scan_points_indices(scan_points, mask_L, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world"])
scan_points_indices_R = get_scan_points_indices(scan_points, mask_R, display_table_mask_label, cam_info["cam_intrinsic"], cam_info["cam_to_world_R"])
scan_points_indices = np.intersect1d(scan_points_indices_L, scan_points_indices_R)
save_target_points(root, scene, frame_id, target_points, file_type=file_type)
save_scan_points_indices(root, scene, frame_id, scan_points_indices, file_type=file_type)
def save_scene_data_multithread(root, scene, file_type="txt"):
target_mask_label = (0, 255, 0, 255)
display_table_mask_label = (0, 0, 255, 255)
random_downsample_N = 32768
voxel_size = 0.002
filter_degree = 75
min_z = 0.2
max_z = 0.5
display_table_info = DataLoadUtil.get_display_table_info(root, scene)
radius = display_table_info["radius"]
scan_points = np.asarray(ReconstructionUtil.generate_scan_points(display_table_top=0, display_table_radius=radius))
frame_num = DataLoadUtil.get_scene_seq_length(root, scene)
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_frame, frame_id, root, scene, scan_points, file_type, target_mask_label, display_table_mask_label, random_downsample_N, voxel_size, filter_degree, min_z, max_z): frame_id for frame_id in range(frame_num)}
for future in as_completed(futures):
frame_id = futures[future]
try:
future.result()
except Exception as e:
Log.error(f"Error processing frame {frame_id}: {e}")
save_scan_points(root, scene, scan_points) # The "done" flag of scene preprocess
if __name__ == "__main__":
#root = "/media/hofee/repository/new_data_with_normal"
root = r"/media/hofee/data/tempdir/test_real_output"
# list_path = r"/media/hofee/repository/full_list.txt"
# scene_list = []
# with open(list_path, "r") as f:
# for line in f:
# scene_list.append(line.strip())
scene_list = os.listdir(root)
from_idx = 0 # 1000
to_idx = 1 # 1500
cnt = 0
import time
total = to_idx - from_idx
for scene in scene_list[from_idx:to_idx]:
start = time.time()
save_scene_data(root, scene, cnt, total, file_type="npy")
cnt+=1
end = time.time()
print(f"Time cost: {end-start}")

279
utils/pts_util.py Normal file
View File

@@ -0,0 +1,279 @@
import numpy as np
import open3d as o3d
import torch
import trimesh
from scipy.spatial import cKDTree
from utils.pose_util import PoseUtil
class PtsUtil:
@staticmethod
def voxel_downsample_point_cloud(point_cloud, voxel_size=0.005):
o3d_pc = o3d.geometry.PointCloud()
o3d_pc.points = o3d.utility.Vector3dVector(point_cloud)
downsampled_pc = o3d_pc.voxel_down_sample(voxel_size)
return np.asarray(downsampled_pc.points)
@staticmethod
def random_downsample_point_cloud(point_cloud, num_points, require_idx=False):
if point_cloud.shape[0] == 0:
if require_idx:
return point_cloud, np.array([])
return point_cloud
idx = np.random.choice(len(point_cloud), num_points, replace=True)
if require_idx:
return point_cloud[idx], idx
return point_cloud[idx]
@staticmethod
def fps_downsample_point_cloud(point_cloud, num_points, require_idx=False):
N = point_cloud.shape[0]
mask = np.zeros(N, dtype=bool)
sampled_indices = np.zeros(num_points, dtype=int)
sampled_indices[0] = np.random.randint(0, N)
distances = np.linalg.norm(
point_cloud - point_cloud[sampled_indices[0]], axis=1
)
for i in range(1, num_points):
farthest_index = np.argmax(distances)
sampled_indices[i] = farthest_index
mask[farthest_index] = True
new_distances = np.linalg.norm(
point_cloud - point_cloud[farthest_index], axis=1
)
distances = np.minimum(distances, new_distances)
sampled_points = point_cloud[sampled_indices]
if require_idx:
return sampled_points, sampled_indices
return sampled_points
@staticmethod
def random_downsample_point_cloud_tensor(point_cloud, num_points):
idx = torch.randint(0, len(point_cloud), (num_points,))
return point_cloud[idx]
@staticmethod
def voxelize_points(points, voxel_size):
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
return unique_voxels
@staticmethod
def transform_point_cloud(points, pose_mat):
points_h = np.concatenate([points, np.ones((points.shape[0], 1))], axis=1)
points_h = np.dot(pose_mat, points_h.T).T
return points_h[:, :3]
@staticmethod
def get_overlapping_points(
point_cloud_L, point_cloud_R, voxel_size=0.005, require_idx=False
):
voxels_L, indices_L = PtsUtil.voxelize_points(point_cloud_L, voxel_size)
voxels_R, _ = PtsUtil.voxelize_points(point_cloud_R, voxel_size)
voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3)
voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3)
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
mask_L = np.isin(
indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]
)
overlapping_points = point_cloud_L[mask_L]
if require_idx:
return overlapping_points, mask_L
return overlapping_points
@staticmethod
def filter_points(
points,
points_normals,
cam_pose,
voxel_size=0.002,
theta=45,
z_range=(0.25, 0.5),
):
"""filter with z range"""
points_cam = PtsUtil.transform_point_cloud(points, np.linalg.inv(cam_pose))
idx = (points_cam[:, 2] > z_range[0]) & (points_cam[:, 2] < z_range[1])
z_filtered_points = points[idx]
""" filter with normal """
sampled_points = PtsUtil.voxel_downsample_point_cloud(
z_filtered_points, voxel_size
)
kdtree = cKDTree(points_normals[:, :3])
_, indices = kdtree.query(sampled_points)
nearest_points = points_normals[indices]
normals = nearest_points[:, 3:]
camera_axis = -cam_pose[:3, 2]
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
cos_theta = np.dot(normals_normalized, camera_axis)
theta_rad = np.deg2rad(theta)
idx = cos_theta > np.cos(theta_rad)
filtered_sampled_points = sampled_points[idx]
return filtered_sampled_points[:, :3]
@staticmethod
def multi_scale_icp(
source, target, voxel_size_range, init_transformation=None, steps=20
):
pipreg = o3d.pipelines.registration
if init_transformation is not None:
current_transformation = init_transformation
else:
current_transformation = np.identity(4)
cnt = 0
best_score = 1e10
best_reg = None
voxel_sizes = []
for i in range(steps):
voxel_sizes.append(
voxel_size_range[0]
+ i * (voxel_size_range[1] - voxel_size_range[0]) / steps
)
for voxel_size in voxel_sizes:
radius_normal = voxel_size * 2
source_downsampled = source.voxel_down_sample(voxel_size)
source_downsampled.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_normal, max_nn=30
)
)
target_downsampled = target.voxel_down_sample(voxel_size)
target_downsampled.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_normal, max_nn=30
)
)
reg_icp = pipreg.registration_icp(
source_downsampled,
target_downsampled,
voxel_size * 2,
current_transformation,
pipreg.TransformationEstimationPointToPlane(),
pipreg.ICPConvergenceCriteria(max_iteration=500),
)
cnt += 1
if reg_icp.fitness == 0:
score = 1e10
else:
score = reg_icp.inlier_rmse / reg_icp.fitness
if score < best_score:
best_score = score
best_reg = reg_icp
return best_reg, best_score
@staticmethod
def multi_scale_ransac(source_downsampled, target_downsampled, source_fpfh, model_fpfh, voxel_size_range, steps=20):
pipreg = o3d.pipelines.registration
cnt = 0
best_score = 1e10
best_reg = None
voxel_sizes = []
for i in range(steps):
voxel_sizes.append(
voxel_size_range[0]
+ i * (voxel_size_range[1] - voxel_size_range[0]) / steps
)
for voxel_size in voxel_sizes:
reg_ransac = pipreg.registration_ransac_based_on_feature_matching(
source_downsampled,
target_downsampled,
source_fpfh,
model_fpfh,
mutual_filter=True,
max_correspondence_distance=voxel_size*2,
estimation_method=pipreg.TransformationEstimationPointToPoint(False),
ransac_n=4,
checkers=[pipreg.CorrespondenceCheckerBasedOnEdgeLength(0.9)],
criteria=pipreg.RANSACConvergenceCriteria(8000000, 500),
)
cnt += 1
if reg_ransac.fitness == 0:
score = 1e10
else:
score = reg_ransac.inlier_rmse / reg_ransac.fitness
if score < best_score:
best_score = score
best_reg = reg_ransac
return best_reg, best_score
@staticmethod
def register(pcl: np.ndarray, model: trimesh.Trimesh, voxel_size=0.01):
radius_normal = voxel_size * 2
pipreg = o3d.pipelines.registration
model_pcd = o3d.geometry.PointCloud()
model_pcd.points = o3d.utility.Vector3dVector(model.vertices)
model_downsampled = model_pcd.voxel_down_sample(voxel_size)
model_downsampled.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_normal, max_nn=30
)
)
model_fpfh = pipreg.compute_fpfh_feature(
model_downsampled,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100),
)
source_pcd = o3d.geometry.PointCloud()
source_pcd.points = o3d.utility.Vector3dVector(pcl)
source_downsampled = source_pcd.voxel_down_sample(voxel_size)
source_downsampled.estimate_normals(
search_param=o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_normal, max_nn=30
)
)
source_fpfh = pipreg.compute_fpfh_feature(
source_downsampled,
o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=100),
)
reg_ransac, ransac_best_score = PtsUtil.multi_scale_ransac(
source_downsampled,
model_downsampled,
source_fpfh,
model_fpfh,
voxel_size_range=(0.03, 0.005),
steps=3,
)
reg_icp, icp_best_score = PtsUtil.multi_scale_icp(
source_downsampled,
model_downsampled,
voxel_size_range=(0.02, 0.001),
init_transformation=reg_ransac.transformation,
steps=50,
)
return reg_icp.transformation
@staticmethod
def get_pts_from_depth(depth, cam_intrinsic, cam_extrinsic):
h, w = depth.shape
i, j = np.meshgrid(np.arange(w), np.arange(h), indexing="xy")
z = depth
x = (i - cam_intrinsic[0, 2]) * z / cam_intrinsic[0, 0]
y = (j - cam_intrinsic[1, 2]) * z / cam_intrinsic[1, 1]
points_camera = np.stack((x, y, z), axis=-1).reshape(-1, 3)
mask = mask.reshape(-1, 4)
points_camera = np.concatenate(
[points_camera, np.ones((points_camera.shape[0], 1))], axis=-1
)
points_world = np.dot(cam_extrinsic, points_camera.T).T[:, :3]
data = {
"points_world": points_world,
"points_camera": points_camera,
}
return data

View File

@@ -0,0 +1,195 @@
import numpy as np
from scipy.spatial import cKDTree
from utils.pts_util import PtsUtil
class ReconstructionUtil:
@staticmethod
def compute_coverage_rate(target_point_cloud, combined_point_cloud, threshold=0.01):
kdtree = cKDTree(combined_point_cloud)
distances, _ = kdtree.query(target_point_cloud)
covered_points_num = np.sum(distances < threshold*2)
coverage_rate = covered_points_num / target_point_cloud.shape[0]
return coverage_rate, covered_points_num
@staticmethod
def check_overlap(new_point_cloud, combined_point_cloud, overlap_area_threshold=25, voxel_size=0.01):
kdtree = cKDTree(combined_point_cloud)
distances, _ = kdtree.query(new_point_cloud)
overlapping_points = np.sum(distances < voxel_size*2)
cm = 0.01
voxel_size_cm = voxel_size / cm
overlap_area = overlapping_points * voxel_size_cm * voxel_size_cm
return overlap_area > overlap_area_threshold
@staticmethod
def get_new_added_points(old_combined_pts, new_pts, threshold=0.005):
if old_combined_pts.size == 0:
return new_pts
if new_pts.size == 0:
return np.array([])
tree = cKDTree(old_combined_pts)
distances, _ = tree.query(new_pts, k=1)
new_added_points = new_pts[distances > threshold]
return new_added_points
@staticmethod
def compute_next_best_view_sequence_with_overlap(target_point_cloud, point_cloud_list, scan_points_indices_list, threshold=0.01, soft_overlap_threshold=0.5, hard_overlap_threshold=0.7, init_view = 0, scan_points_threshold=5, status_info=None):
selected_views = [init_view]
combined_point_cloud = point_cloud_list[init_view]
history_indices = [scan_points_indices_list[init_view]]
max_rec_pts = np.vstack(point_cloud_list)
downsampled_max_rec_pts = PtsUtil.voxel_downsample_point_cloud(max_rec_pts, threshold)
max_rec_pts_num = downsampled_max_rec_pts.shape[0]
max_real_rec_pts_coverage, _ = ReconstructionUtil.compute_coverage_rate(target_point_cloud, downsampled_max_rec_pts, threshold)
new_coverage, new_covered_num = ReconstructionUtil.compute_coverage_rate(downsampled_max_rec_pts, combined_point_cloud, threshold)
current_coverage = new_coverage
current_covered_num = new_covered_num
remaining_views = list(range(len(point_cloud_list)))
view_sequence = [(init_view, current_coverage)]
cnt_processed_view = 0
remaining_views.remove(init_view)
curr_rec_pts_num = combined_point_cloud.shape[0]
while remaining_views:
best_view = None
best_coverage_increase = -1
best_combined_point_cloud = None
best_covered_num = 0
for view_index in remaining_views:
if point_cloud_list[view_index].shape[0] == 0:
continue
if selected_views:
new_scan_points_indices = scan_points_indices_list[view_index]
if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold):
overlap_threshold = hard_overlap_threshold
else:
overlap_threshold = soft_overlap_threshold
overlap_rate = ReconstructionUtil.compute_overlap_rate(point_cloud_list[view_index],combined_point_cloud, threshold)
if overlap_rate < overlap_threshold:
continue
new_combined_point_cloud = np.vstack([combined_point_cloud, point_cloud_list[view_index]])
new_downsampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(new_combined_point_cloud,threshold)
new_coverage, new_covered_num = ReconstructionUtil.compute_coverage_rate(downsampled_max_rec_pts, new_downsampled_combined_point_cloud, threshold)
coverage_increase = new_coverage - current_coverage
if coverage_increase > best_coverage_increase:
best_coverage_increase = coverage_increase
best_view = view_index
best_covered_num = new_covered_num
best_combined_point_cloud = new_downsampled_combined_point_cloud
if best_view is not None:
if best_coverage_increase <=1e-3 or best_covered_num - current_covered_num <= 5:
break
selected_views.append(best_view)
best_rec_pts_num = best_combined_point_cloud.shape[0]
print(f"Current rec pts num: {curr_rec_pts_num}, Best rec pts num: {best_rec_pts_num}, Best cover pts: {best_covered_num}, Max rec pts num: {max_rec_pts_num}")
print(f"Current coverage: {current_coverage}, Best coverage increase: {best_coverage_increase}, Max Real coverage: {max_real_rec_pts_coverage}")
current_covered_num = best_covered_num
curr_rec_pts_num = best_rec_pts_num
combined_point_cloud = best_combined_point_cloud
remaining_views.remove(best_view)
history_indices.append(scan_points_indices_list[best_view])
current_coverage += best_coverage_increase
cnt_processed_view += 1
if status_info is not None:
sm = status_info["status_manager"]
app_name = status_info["app_name"]
runner_name = status_info["runner_name"]
sm.set_status(app_name, runner_name, "current coverage", current_coverage)
sm.set_progress(app_name, runner_name, "processed view", cnt_processed_view, len(point_cloud_list))
view_sequence.append((best_view, current_coverage))
else:
break
if status_info is not None:
sm = status_info["status_manager"]
app_name = status_info["app_name"]
runner_name = status_info["runner_name"]
sm.set_progress(app_name, runner_name, "processed view", len(point_cloud_list), len(point_cloud_list))
return view_sequence, remaining_views, combined_point_cloud
@staticmethod
def compute_next_best_view_with_overlap(scanned_pts, point_cloud_list, history_indices, scan_points_indices_list, threshold=0.01, overlap_area_threshold=25, scan_points_threshold=5):
max_rec_pts = np.vstack(point_cloud_list)
downsampled_max_rec_pts = PtsUtil.voxel_downsample_point_cloud(max_rec_pts, threshold)
best_view = None
best_coverage = -1
best_covered_num = 0
for view in range(len(point_cloud_list)):
if point_cloud_list[view].shape[0] == 0:
continue
new_scan_points_indices = scan_points_indices_list[view]
if not ReconstructionUtil.check_scan_points_overlap(history_indices, new_scan_points_indices, scan_points_threshold):
curr_overlap_area_threshold = overlap_area_threshold
else:
curr_overlap_area_threshold = overlap_area_threshold * 0.5
if not ReconstructionUtil.check_overlap(point_cloud_list[view], scanned_pts, overlap_area_threshold = curr_overlap_area_threshold, voxel_size=threshold):
continue
new_combined_point_cloud = np.vstack([scanned_pts ,point_cloud_list[view]])
new_downsampled_combined_point_cloud = PtsUtil.voxel_downsample_point_cloud(new_combined_point_cloud,threshold)
new_coverage, new_covered_num = ReconstructionUtil.compute_coverage_rate(downsampled_max_rec_pts, new_downsampled_combined_point_cloud, threshold)
if new_coverage > best_coverage:
best_coverage = new_coverage
best_covered_num = new_covered_num
best_view = view
return best_view, best_coverage, best_covered_num
@staticmethod
def generate_scan_points(display_table_top, display_table_radius, min_distance=0.03, max_points_num = 500, max_attempts = 1000):
points = []
attempts = 0
while len(points) < max_points_num and attempts < max_attempts:
angle = np.random.uniform(0, 2 * np.pi)
r = np.random.uniform(0, display_table_radius)
x = r * np.cos(angle)
y = r * np.sin(angle)
z = display_table_top
new_point = (x, y, z)
if all(np.linalg.norm(np.array(new_point) - np.array(existing_point)) >= min_distance for existing_point in points):
points.append(new_point)
attempts += 1
return points
@staticmethod
def compute_covered_scan_points(scan_points, point_cloud, threshold=0.01):
tree = cKDTree(point_cloud)
covered_points = []
indices = []
for i, scan_point in enumerate(scan_points):
if tree.query_ball_point(scan_point, threshold):
covered_points.append(scan_point)
indices.append(i)
return covered_points, indices
@staticmethod
def check_scan_points_overlap(history_indices, indices2, threshold=5):
try:
if len(indices2) == 0:
return False
for indices1 in history_indices:
if len(set(indices1).intersection(set(indices2))) >= threshold:
return True
except Exception as e:
print(e)
import ipdb; ipdb.set_trace()
return False

45
utils/render_util.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import json
import subprocess
import tempfile
import shutil
from utils.data_load import DataLoadUtil
from utils.pts_util import PtsUtil
class RenderUtil:
@staticmethod
def render_pts(cam_pose, object_name, script_path, model_points_normals, voxel_threshold=0.005, filter_degree=75, nO_to_nL_pose=None, require_full_scene=False):
nO_to_world_pose = DataLoadUtil.get_real_cam_O_from_cam_L(cam_pose, nO_to_nL_pose, scene_path=scene_path)
with tempfile.TemporaryDirectory() as temp_dir:
params = {
"cam_pose": nO_to_world_pose.tolist(),
"object_name": scene_path
}
params_data_path = os.path.join(temp_dir, "params.json")
with open(params_data_path, 'w') as f:
json.dump(params, f)
result = subprocess.run([
'blender', '-b', '-P', script_path, '--', temp_dir
], capture_output=True, text=True)
if result.returncode != 0:
print("Blender script failed:")
print(result.stderr)
return None
path = os.path.join(temp_dir, "tmp")
point_cloud = DataLoadUtil.get_target_point_cloud_world_from_path(path, binocular=True)
cam_params = DataLoadUtil.load_cam_info(path, binocular=True)
filtered_point_cloud = PtsUtil.filter_points(point_cloud, model_points_normals, cam_pose=cam_params["cam_to_world"], voxel_size=voxel_threshold, theta=filter_degree)
full_scene_point_cloud = None
if require_full_scene:
depth_L, depth_R = DataLoadUtil.load_depth(path, cam_params['near_plane'], cam_params['far_plane'], binocular=True)
point_cloud_L = DataLoadUtil.get_point_cloud(depth_L, cam_params['cam_intrinsic'], cam_params['cam_to_world'])['points_world']
point_cloud_R = DataLoadUtil.get_point_cloud(depth_R, cam_params['cam_intrinsic'], cam_params['cam_to_world_R'])['points_world']
point_cloud_L = PtsUtil.random_downsample_point_cloud(point_cloud_L, 65536)
point_cloud_R = PtsUtil.random_downsample_point_cloud(point_cloud_R, 65536)
full_scene_point_cloud = PtsUtil.get_overlapping_points(point_cloud_L, point_cloud_R)
return filtered_point_cloud, full_scene_point_cloud

127
utils/view_util.py Normal file
View File

@@ -0,0 +1,127 @@
import numpy as np
from scipy.spatial.transform import Rotation as R
from dataclasses import dataclass
@dataclass
class CameraIntrinsics:
width: int
height: int
fx: float
fy: float
cx: float
cy: float
@property
def intrinsic_matrix(self):
return np.array([[self.fx, 0, self.cx], [0, self.fy, self.cy], [0, 0, 1]])
@dataclass
class CameraExtrinsics:
def __init__(self, rotation: np.ndarray, translation: np.ndarray, rot_type: str):
"""
rotation: 3x3 rotation matrix or 1x3 euler angles or 1x4 quaternion
translation: 1x3 or 3x1 translation vector
rot_type: "mat", "euler_xyz", "quat_xyzw"
"""
assert rot_type in ["mat", "euler_xyz", "quat_xyzw"]
if rot_type == "mat":
self._rot = R.from_matrix(rotation)
elif rot_type == "euler_xyz":
self._rot = R.from_euler('xyz', rotation, degrees=True)
elif rot_type == "quat_xyzw":
self._rot = R.from_quat(rotation)
self._translation = translation
@property
def extrinsic_matrix(self):
return np.vstack([np.hstack([self._rot.as_matrix(), self._translation.reshape(3, 1)]), [0, 0, 0, 1]])
@property
def rotation_euler_xyz(self):
return self._rot.as_euler('xyz', degrees=True)
@property
def rotation_quat_xyzw(self):
return self._rot.as_quat()
@property
def rotation_matrix(self):
return self._rot.as_matrix()
@property
def translation(self):
return self._translation
@dataclass
class CameraData:
def __init__(self, depth_image: np.ndarray, image_id: int, intrinsics: CameraIntrinsics, extrinsics: CameraExtrinsics):
self._depth_image = depth_image
self._image_id = image_id
self._intrinsics = intrinsics
self._extrinsics = extrinsics
@property
def depth_image(self):
return self._depth_image
@property
def image_id(self):
return self._image_id
@property
def intrinsics(self):
return self._intrinsics.intrinsic_matrix
@property
def extrinsics(self):
return self._extrinsics.extrinsic_matrix
@property
def projection_matrix(self):
return self.intrinsics @ self.extrinsics[:3, :4]
@property
def pts_camera(self):
height, width = self.depth_image.shape
v, u = np.indices((height, width))
points = np.vstack([u.flatten(), v.flatten(), np.ones_like(u.flatten())]) # 3xN
points = np.linalg.inv(self.intrinsics) @ points # 3xN
points = points.T # Nx3
points = points * self.depth_image.flatten()[:, np.newaxis] # Nx3
points = points[points[:, 2] > 0] # Nx3
return points
@property
def pts_world(self):
homogeneous_pts = np.hstack([self.pts_camera, np.ones((self.pts_camera.shape[0], 1))]) # Nx4
pts_world = self.extrinsics @ homogeneous_pts.T # 4xN
return pts_world[:3, :].T
class ViewUtil:
def get_pts(view_data):
image_id = view_data["image_id"]
depth_intrinsics = view_data["depth_intrinsics"]
depth_extrinsics = view_data["depth_extrinsics"]
depth_image = np.array(view_data["depth_image"], dtype=np.uint16)
if image_id is None:
return None
else:
camera_intrinsics = CameraIntrinsics(
depth_intrinsics['width'],
depth_intrinsics['height'],
depth_intrinsics['fx'],
depth_intrinsics['fy'],
depth_intrinsics['cx'],
depth_intrinsics['cy']
)
camera_extrinsics = CameraExtrinsics(
depth_extrinsics[:3, :3],
depth_extrinsics[:3, 3],
rot_type="mat"
)
camera_data = CameraData(depth_image, image_id, camera_intrinsics, camera_extrinsics)
pts = camera_data.pts_world
return pts/1000

54
vis_pts_and_nrm.py Normal file
View File

@@ -0,0 +1,54 @@
# import numpy as np
# import matplotlib.pyplot as plt
# from mpl_toolkits.mplot3d import Axes3D
# # 假设 points_and_normals 是你的 Nx6 矩阵
# # 前三列是点坐标,后三列是法线
# points_and_normals = np.loadtxt("/Users/hofee/Downloads/temp_output/cad_model_world/points_and_normals.txt") # 这里用随机点代替你的数据
# points = points_and_normals[:100, :3]
# normals = points_and_normals[:100, 3:]
# # 创建3D图形
# fig = plt.figure()
# ax = fig.add_subplot(111, projection='3d')
# # 绘制点云
# ax.scatter(points[:, 0], points[:, 1], points[:, 2], color='b', marker='o')
# # 绘制法线 (从每个点出发的一小段箭头)
# ax.quiver(points[:, 0], points[:, 1], points[:, 2],
# normals[:, 0], normals[:, 1], normals[:, 2], length=0.1, color='r')
# plt.show()
import numpy as np
# 假设 points_and_normals 是你的 Nx6 矩阵
# points_and_normals[:,:3] 是点的坐标
# points_and_normals[:,3:] 是法线
points_and_normals = np.loadtxt("/Users/hofee/Downloads/temp_output/cad_model_world/points_and_normals.txt") # 这里用随机点代替你的数据
print(points_and_normals.shape)
points = points_and_normals[300:400, :3]
normals = points_and_normals[300:400, 3:]
# 设置你想在法线方向上采样的距离范围和点数
num_samples_per_point = 20 # 每个法线方向采样的点数
sampling_distances = np.linspace(0, 0.5, num_samples_per_point) # 采样距离范围
# 创建一个空列表来保存采样点
sampled_points = []
# 对每个点进行法线方向的采样
for point, normal in zip(points, normals):
for dist in sampling_distances:
# 在法线方向上偏移点
sampled_point = point + dist * normal
sampled_points.append(sampled_point)
# 转换为 numpy 数组
sampled_points = np.array(sampled_points)
# 保存为点云文件 (例如 .txt 或 .xyz 格式)
np.savetxt('sampled_points.txt', sampled_points)
print("采样点云已保存为 'sampled_points.xyz'")