debug sim
This commit is contained in:
parent
e6650e4042
commit
03bed63e8d
1
.gitignore
vendored
1
.gitignore
vendored
@ -5,6 +5,7 @@ __pycache__/
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
*.txt
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
|
@ -1,11 +1,16 @@
|
||||
center: [0.5, 0.0, 0.25]
|
||||
center: [0.45, 0.0, 0.25]
|
||||
q: [0.0, -1.39, 0.0, -2.36, 0.0, 1.57, 0.79]
|
||||
objects:
|
||||
- object_id: ycb/006_mustard_bottle
|
||||
xyz: [0.1, 0.0, 0.03]
|
||||
xyz: [0.0, 0.0, 0.1]
|
||||
rpy: [0, 0, 0]
|
||||
scale: 0.8
|
||||
- object_id: active_perception/box
|
||||
xyz: [0.1, 0.0, 0.0]
|
||||
xyz: [-0.1, -0.15, 0.01]
|
||||
rpy: [0, 0, 0]
|
||||
scale: 0.9
|
||||
scale: 2.0
|
||||
|
||||
- object_id: active_perception/box
|
||||
xyz: [-0.1, -0.15, 0.05]
|
||||
rpy: [0, 45, 0]
|
||||
scale: 1.5
|
29994
gsnet_input_points.txt
29994
gsnet_input_points.txt
File diff suppressed because it is too large
Load Diff
@ -12,7 +12,7 @@ settings:
|
||||
experiment:
|
||||
name: test_inference
|
||||
root_dir: "experiments"
|
||||
model_path: "/home/zhengxiao-han/Downloads/full_149_241009.pth"
|
||||
model_path: "/media/hofee/data/weights/nbv_grasping/full_149_241009.pth"
|
||||
use_cache: True
|
||||
small_batch_overfit: False
|
||||
|
||||
|
@ -17,6 +17,7 @@ from geometry_msgs.msg import Pose
|
||||
import tf
|
||||
from robot_helpers.ros import tf as rhtf
|
||||
from sklearn.neighbors import NearestNeighbors
|
||||
from .utils.pts import PtsUtil
|
||||
|
||||
import sensor_msgs.point_cloud2 as pc2
|
||||
from sensor_msgs.msg import PointCloud2, PointField
|
||||
@ -94,11 +95,11 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
|
||||
# Visualize Initial Camera Pose
|
||||
self.vis_cam_pose(x)
|
||||
|
||||
# When policy hasn't produced an available grasp
|
||||
self.unreachable_poses = []
|
||||
while(self.updated == False):
|
||||
# Clear visualization points
|
||||
self.publish_pointcloud([[0,0,0]])
|
||||
self.publish_pointcloud(np.zeros((1, 3)))
|
||||
|
||||
# Inference with our model
|
||||
self.target_points, self.scene_points, _ = self.depth_image_to_ap_input(img,
|
||||
@ -109,126 +110,79 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
target_sample_num=1024)
|
||||
ap_input = {'target_pts': self.target_points,
|
||||
'scene_pts': self.scene_points}
|
||||
# save point cloud
|
||||
# target_points = self.target_points.cpu().numpy()[0,:,:]
|
||||
self.scene_points_cache = self.scene_points.cpu().numpy()[0]
|
||||
self.cam_pose_cache = rhtf.lookup(self.base_frame, self.cam_frame, rospy.Time.now()).as_matrix()
|
||||
|
||||
self.publish_pointcloud(self.scene_points_cache)
|
||||
|
||||
# time.sleep(10000000)
|
||||
# np.savetxt("target_points.txt", target_points, delimiter=",")
|
||||
# np.savetxt("scene_points.txt", scene_points, delimiter=",")
|
||||
|
||||
ap_output = self.ap_inference_engine.inference(ap_input)
|
||||
delta_rot_6d = ap_output['estimated_delta_rot_6d']
|
||||
|
||||
current_cam_pose = torch.from_numpy(x.as_matrix()).float().to("cuda:0")
|
||||
src_cam_to_world_mat = torch.from_numpy(x.as_matrix()).float().to("cuda:0")
|
||||
est_delta_rot_mat = self.rotation_6d_to_matrix_tensor_batch(delta_rot_6d)[0]
|
||||
|
||||
target_points_np = self.target_points.cpu().numpy()[0,:,:]
|
||||
central_point_of_target = np.mean(target_points_np, axis=0)
|
||||
look_at_center = torch.from_numpy(central_point_of_target).float().to("cuda:0")
|
||||
look_at_center_cam = np.mean(target_points_np, axis=0)
|
||||
# Convert look_at_center's reference frame to arm frame
|
||||
look_at_center_T = np.eye(4)
|
||||
look_at_center_T[:3, 3] = look_at_center.cpu().numpy()
|
||||
look_at_center_T = current_cam_pose.cpu().numpy() @ look_at_center_T
|
||||
look_at_center = torch.from_numpy(look_at_center_T[:3, 3]).float().to("cuda:0")
|
||||
look_at_center_cam_homogeneous = np.concatenate([look_at_center_cam, np.ones(1)])
|
||||
look_at_center_world = (src_cam_to_world_mat.cpu().numpy() @ look_at_center_cam_homogeneous)[:3]
|
||||
look_at_center_world = torch.from_numpy(look_at_center_world).float().to("cuda:0")
|
||||
# Get the NBV
|
||||
nbv_tensor = self.get_transformed_mat(current_cam_pose,
|
||||
dst_cam_to_world_mat = self.get_transformed_mat(src_cam_to_world_mat,
|
||||
est_delta_rot_mat,
|
||||
look_at_center)
|
||||
nbv_numpy = nbv_tensor.cpu().numpy()
|
||||
nbv_transform = Transform.from_matrix(nbv_numpy)
|
||||
x_d = nbv_transform
|
||||
look_at_center_world)
|
||||
dst_cam_to_world_mat_numpy = dst_cam_to_world_mat.cpu().numpy()
|
||||
dst_transform = Transform.from_matrix(dst_cam_to_world_mat_numpy)
|
||||
x_d = dst_transform
|
||||
|
||||
# Check if this pose available
|
||||
print("found a NBV pose")
|
||||
if(self.solve_cam_ik(self.q0, x_d)):
|
||||
self.vis_cam_pose(x_d)
|
||||
self.x_d = x_d
|
||||
self.updated = True
|
||||
print("Found an NBV!")
|
||||
print("the NBV pose is reachable")
|
||||
return
|
||||
else:
|
||||
self.unreachable_poses.append(x_d)
|
||||
self.vis_unreachable_pose(self.unreachable_poses)
|
||||
print("the NBV pose is not reachable")
|
||||
|
||||
# Policy has produced an available nbv and moved to that camera pose
|
||||
if(self.updated == True):
|
||||
# Request grasping poses from GSNet
|
||||
self.target_points, self.scene_points, self.all_points = \
|
||||
self.depth_image_to_ap_input(img, seg, target_id, support_id)
|
||||
target_points_list = np.asarray(self.target_points.cpu().numpy())[0].tolist()
|
||||
central_point_of_target = np.mean(target_points_list, axis=0)
|
||||
|
||||
|
||||
central_point_of_target = np.mean(self.target_points[0].cpu().numpy(), axis=0)
|
||||
|
||||
# Crop points to desired number of points
|
||||
num_points_valid = False
|
||||
target_points_radius = self.crop_min_radius
|
||||
required_num_points = 15000
|
||||
# Merge all points and scene points
|
||||
|
||||
self.all_points = np.asarray(self.all_points.cpu().numpy())[0]
|
||||
all_points_list = self.all_points.tolist()
|
||||
scene_points_list = self.scene_points_cache.tolist()
|
||||
|
||||
# convert scene_points to current frame
|
||||
self.cam_pose = rhtf.lookup(self.base_frame, self.cam_frame, rospy.Time.now()).as_matrix()
|
||||
for idx, point in enumerate(scene_points_list):
|
||||
point = np.asarray(point)
|
||||
T_point_cam_old = np.array([[1, 0, 0, point[0]],
|
||||
[0, 1, 0, point[1]],
|
||||
[0, 0, 1, point[2]],
|
||||
[0, 0, 0, 1]])
|
||||
# point in arm frame
|
||||
T_point_arm = np.matmul(self.cam_pose_cache, T_point_cam_old)
|
||||
# point in camera frame
|
||||
T_point_cam_new = np.matmul(np.linalg.inv(self.cam_pose),
|
||||
T_point_arm)
|
||||
point = T_point_cam_new[:3, 3].T
|
||||
point = point.tolist()
|
||||
scene_points_list[idx] = point
|
||||
|
||||
merged_points_list = scene_points_list + all_points_list
|
||||
merged_points_list = np.asarray(merged_points_list)
|
||||
print("merged_points_list shape: "+str(merged_points_list.shape))
|
||||
merged_points = self.all_points
|
||||
|
||||
while not num_points_valid:
|
||||
gsnet_input_points = self.crop_pts_sphere(merged_points_list,
|
||||
gsnet_input_points = self.crop_pts_sphere(merged_points,
|
||||
central_point_of_target,
|
||||
radius=target_points_radius)
|
||||
if(len(gsnet_input_points) >= required_num_points):
|
||||
num_points_valid = True
|
||||
# downsample points
|
||||
gsnet_input_points = np.asarray(gsnet_input_points)
|
||||
gsnet_input_points = gsnet_input_points[np.random.choice(gsnet_input_points.shape[0], required_num_points, replace=False)]
|
||||
gsnet_input_points = PtsUtil.random_downsample_point_cloud(gsnet_input_points, required_num_points)
|
||||
else:
|
||||
target_points_radius += self.crop_radius_step
|
||||
if(target_points_radius > self.crop_max_radius):
|
||||
print("Target points radius exceeds maximum radius")
|
||||
print("Number of points: "+str(len(gsnet_input_points)))
|
||||
print("Interpolating points")
|
||||
# Interpolate points
|
||||
if(len(gsnet_input_points) < self.num_knn_neighbours+1):
|
||||
self.best_grasp = None
|
||||
self.vis.clear_grasp()
|
||||
self.done = True
|
||||
return
|
||||
else:
|
||||
gsnet_input_points = np.asarray(gsnet_input_points)
|
||||
num_interpolation = required_num_points - len(gsnet_input_points)
|
||||
gsnet_input_points = self.interpolate_point_cloud(gsnet_input_points, num_interpolation)
|
||||
num_points_valid = True
|
||||
gsnet_input_points = gsnet_input_points.tolist()
|
||||
|
||||
|
||||
# gsnet_input_points = target_points_list
|
||||
# gsnet_input_points = merged_points_list
|
||||
num_points_valid = True
|
||||
gsnet_input_points = PtsUtil.random_downsample_point_cloud(gsnet_input_points, required_num_points)
|
||||
|
||||
self.publish_pointcloud(gsnet_input_points)
|
||||
|
||||
# save point cloud as .txt
|
||||
np.savetxt("gsnet_input_points.txt", gsnet_input_points, delimiter=",")
|
||||
|
||||
received_points = False
|
||||
while(received_points == False):
|
||||
gsnet_grasping_poses = np.asarray(self.request_grasping_pose(gsnet_input_points))
|
||||
received_points = True
|
||||
print(gsnet_grasping_poses[0].keys())
|
||||
|
||||
gsnet_grasping_poses = np.asarray(self.request_grasping_pose(gsnet_input_points.tolist()))
|
||||
print(gsnet_grasping_poses[0].keys())
|
||||
import ipdb; ipdb.set_trace()
|
||||
|
||||
# DEBUG: publish grasps
|
||||
# self.publish_grasps(gsnet_grasping_poses)
|
||||
@ -237,8 +191,6 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
current_cam_pose = torch.from_numpy(x.as_matrix()).float().to("cuda:0")
|
||||
for gg in gsnet_grasping_poses:
|
||||
gg['T'] = current_cam_pose.cpu().numpy().dot(np.asarray(gg['T']))
|
||||
# Now here is a mysterous bug, the grasping poses have to be rotated
|
||||
# 90 degrees around y-axis to be in the correct reference frame
|
||||
R = np.array([[0, 0, 1], [0, 1, 0], [-1, 0, 0]])
|
||||
gg['T'][:3, :3] = gg['T'][:3, :3].dot(R)
|
||||
|
||||
@ -276,7 +228,6 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
self.best_grasp = None
|
||||
self.vis.clear_grasp()
|
||||
self.done = True
|
||||
# self.publish_pointcloud([[0,0,0]])
|
||||
|
||||
def publish_grasps(self, gg):
|
||||
marker_array = MarkerArray()
|
||||
@ -309,7 +260,6 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
self.grasp_publisher.publish(marker_array)
|
||||
|
||||
def publish_pointcloud(self, point_cloud):
|
||||
point_cloud = np.asarray(point_cloud)
|
||||
cloud_msg = self.create_pointcloud_msg(point_cloud)
|
||||
self.pcd_publisher.publish(cloud_msg)
|
||||
|
||||
@ -333,7 +283,7 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
|
||||
def crop_pts_sphere(self, points, crop_center, radius=0.2):
|
||||
crop_mask = np.linalg.norm(points - crop_center, axis=1) < radius
|
||||
return points[crop_mask].tolist()
|
||||
return points[crop_mask]
|
||||
|
||||
def deactivate(self):
|
||||
self.vis.clear_ig_views()
|
||||
@ -343,43 +293,21 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
# Integrate
|
||||
self.views.append(x)
|
||||
self.vis.path(self.base_frame, self.intrinsic, self.views)
|
||||
|
||||
def vis_unreachable_pose(self, poses):
|
||||
# red_color with look at direction
|
||||
self.vis.unreachable_cam_views(self.base_frame, self.intrinsic, poses)
|
||||
|
||||
|
||||
def vis_scene_cloud(self, img, x):
|
||||
self.tsdf.integrate(img, self.intrinsic, x.inv() * self.T_base_task)
|
||||
scene_cloud = self.tsdf.get_scene_cloud()
|
||||
self.vis.scene_cloud(self.task_frame, np.asarray(scene_cloud.points))
|
||||
|
||||
def generate_grasp(self, q):
|
||||
tsdf_grid = self.tsdf.get_grid()
|
||||
out = self.vgn.predict(tsdf_grid)
|
||||
self.vis.quality(self.task_frame, self.tsdf.voxel_size, out.qual, 0.9)
|
||||
grasps, qualities = self.filter_grasps(out, q)
|
||||
|
||||
if len(grasps) > 0:
|
||||
self.best_grasp, quality = self.select_best_grasp(grasps, qualities)
|
||||
self.vis.grasp(self.base_frame, self.best_grasp, quality)
|
||||
else:
|
||||
self.best_grasp = None
|
||||
self.vis.clear_grasp()
|
||||
|
||||
|
||||
def select_best_grasp(self, grasps, qualities):
|
||||
i = np.argmax(qualities)
|
||||
return grasps[i], qualities[i]
|
||||
|
||||
# pose = rhtf.lookup(self.base_frame, "panda_link8", rospy.Time.now())
|
||||
# ee_matrix = pose.as_matrix()
|
||||
# minimum_difference = np.inf
|
||||
# for i in range(len(grasps)-1, -1, -1):
|
||||
# grasp = grasps[i]
|
||||
# g_matrix = grasp.pose.as_matrix()
|
||||
# # calculatr the Frobenius norm (rotation difference)
|
||||
# rotation_difference = np.linalg.norm(ee_matrix[:3, :3] - g_matrix[:3, :3])
|
||||
# if rotation_difference < minimum_difference:
|
||||
# minimum_difference = rotation_difference
|
||||
# best_grasp = grasp
|
||||
# best_quality = qualities[i]
|
||||
# return best_grasp, best_quality
|
||||
|
||||
def rotation_6d_to_matrix_tensor_batch(self, d6: torch.Tensor) -> torch.Tensor:
|
||||
a1, a2 = d6[..., :3], d6[..., 3:]
|
||||
@ -402,7 +330,7 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
return dst_mat
|
||||
|
||||
def depth_image_to_ap_input(self, depth_img, seg_img, target_id, support_id,
|
||||
scene_sample_num=-1, target_sample_num=-1):
|
||||
scene_sample_num=16384, target_sample_num=1024):
|
||||
target_points = []
|
||||
scene_points = []
|
||||
all_points = []
|
||||
@ -423,47 +351,22 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
z_mat = depth_img
|
||||
x_mat = x_factors * z_mat
|
||||
y_mat = y_factors * z_mat
|
||||
for i in range(img_shape[0]):
|
||||
for j in range(img_shape[1]):
|
||||
seg_id = seg_img[i, j]
|
||||
x = x_mat[i][j]
|
||||
y = y_mat[i][j]
|
||||
z = z_mat[i][j]
|
||||
|
||||
if(int(seg_id) != int(255)): # no background points
|
||||
all_points.append([x,y,z])
|
||||
if(int(seg_id) != int(support_id)): # no support points
|
||||
# This pixel belongs to the scene
|
||||
scene_points.append([x,y,z])
|
||||
if(int(seg_id) == int(target_id)):
|
||||
# This pixel belongs to the target object to be grasped
|
||||
target_points.append([x,y,z])
|
||||
|
||||
# Sample points
|
||||
all_points = np.asarray(all_points)
|
||||
target_points = np.asarray(target_points)
|
||||
scene_points = np.asarray(scene_points)
|
||||
if scene_sample_num > 0:
|
||||
if scene_points.shape[0] < scene_sample_num:
|
||||
print("Scene points are less than the required sample number")
|
||||
num_interpolation = scene_sample_num - scene_points.shape[0]
|
||||
scene_points = self.interpolate_point_cloud(scene_points, num_interpolation)
|
||||
print("Interpolated scene points. Shape: "+str(scene_points.shape))
|
||||
else:
|
||||
scene_points = scene_points[np.random.choice(scene_points.shape[0], scene_sample_num, replace=False)]
|
||||
|
||||
if target_sample_num > 0:
|
||||
if target_points.shape[0] < target_sample_num:
|
||||
print("Target points are less than the required sample number")
|
||||
num_interpolation = target_sample_num - target_points.shape[0]
|
||||
target_points = self.interpolate_point_cloud(target_points, num_interpolation)
|
||||
print("Interpolated target points. Shape: "+str(target_points.shape))
|
||||
else:
|
||||
target_points = target_points[np.random.choice(target_points.shape[0], target_sample_num, replace=False)]
|
||||
z_mat_flat = z_mat.flatten()
|
||||
seg_img_flat = seg_img.flatten()
|
||||
x_mat_flat = x_mat.flatten()
|
||||
y_mat_flat = y_mat.flatten()
|
||||
|
||||
# reshape points
|
||||
non_background_mask = (seg_img_flat != 255)
|
||||
non_support_mask = (seg_img_flat != support_id)
|
||||
target_mask = (seg_img_flat == target_id)
|
||||
points = np.stack([x_mat_flat, y_mat_flat, z_mat_flat], axis=1)
|
||||
all_points = points[non_background_mask]
|
||||
scene_points = points[non_background_mask & non_support_mask]
|
||||
target_points = points[target_mask]
|
||||
target_points = PtsUtil.random_downsample_point_cloud(target_points, target_sample_num)
|
||||
scene_points = PtsUtil.random_downsample_point_cloud(scene_points, scene_sample_num)
|
||||
target_points = target_points.reshape(1, target_points.shape[0], 3)
|
||||
# self.pcdvis.update_points(target_points)
|
||||
target_points = torch.from_numpy(target_points).float().to("cuda:0")
|
||||
|
||||
scene_points = scene_points.reshape(1, scene_points.shape[0], 3)
|
||||
@ -473,17 +376,3 @@ class ActivePerceptionSingleViewPolicy(SingleViewPolicy):
|
||||
all_points = torch.from_numpy(all_points).float().to("cuda:0")
|
||||
|
||||
return target_points, scene_points, all_points
|
||||
|
||||
def interpolate_point_cloud(self, points, num_new_points):
|
||||
# Fit NearestNeighbors on existing points
|
||||
nbrs = NearestNeighbors(n_neighbors=self.num_knn_neighbours).fit(points)
|
||||
interpolated_points = []
|
||||
|
||||
for _ in range(num_new_points):
|
||||
random_point = points[np.random.choice(len(points))]
|
||||
distances, indices = nbrs.kneighbors([random_point])
|
||||
neighbors = points[indices[0]]
|
||||
new_point = neighbors.mean(axis=0) # Average of neighbors
|
||||
interpolated_points.append(new_point)
|
||||
|
||||
return np.vstack([points, interpolated_points])
|
@ -44,6 +44,15 @@ class Visualizer(vgn.rviz.Visualizer):
|
||||
marker = create_line_list_marker(frame, pose, scale, color, lines, "bbox")
|
||||
self.draw([marker])
|
||||
|
||||
def unreachable_cam_views(self, frame, intrinsic, views):
|
||||
scale = [0.002, 0.0, 0.0]
|
||||
color = red
|
||||
markers = []
|
||||
for i, view in enumerate(views):
|
||||
marker = create_view_marker(frame, view, scale, color, intrinsic, 0.0, 0.02, ns="unreachable_views", id=i)
|
||||
markers.append(marker)
|
||||
self.draw(markers)
|
||||
|
||||
def ig_views(self, frame, intrinsic, views, values):
|
||||
vmin, vmax = min(values), max(values)
|
||||
scale = [0.002, 0.0, 0.0]
|
||||
|
117
src/active_grasp/utils/pts.py
Normal file
117
src/active_grasp/utils/pts.py
Normal file
@ -0,0 +1,117 @@
|
||||
import numpy as np
|
||||
import open3d as o3d
|
||||
import torch
|
||||
|
||||
class PtsUtil:
|
||||
|
||||
@staticmethod
|
||||
def voxel_downsample_point_cloud(point_cloud, voxel_size=0.005, require_idx=False):
|
||||
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
|
||||
if require_idx:
|
||||
_, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
|
||||
idx_sort = np.argsort(inverse)
|
||||
idx_unique = idx_sort[np.cumsum(counts)-counts]
|
||||
downsampled_points = point_cloud[idx_unique]
|
||||
return downsampled_points, idx_unique
|
||||
else:
|
||||
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
|
||||
return unique_voxels[0]*voxel_size
|
||||
|
||||
@staticmethod
|
||||
def voxel_downsample_point_cloud_random(point_cloud, voxel_size=0.005, require_idx=False):
|
||||
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
|
||||
unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
|
||||
idx_sort = np.argsort(inverse)
|
||||
idx_unique = idx_sort[np.cumsum(counts)-counts]
|
||||
downsampled_points = point_cloud[idx_unique]
|
||||
if require_idx:
|
||||
return downsampled_points, inverse
|
||||
return downsampled_points
|
||||
|
||||
@staticmethod
|
||||
def random_downsample_point_cloud(point_cloud, num_points, require_idx=False):
|
||||
if point_cloud.shape[0] == 0:
|
||||
if require_idx:
|
||||
return point_cloud, np.array([])
|
||||
return point_cloud
|
||||
idx = np.random.choice(len(point_cloud), num_points, replace=True)
|
||||
if require_idx:
|
||||
return point_cloud[idx], idx
|
||||
return point_cloud[idx]
|
||||
|
||||
@staticmethod
|
||||
def fps_downsample_point_cloud(point_cloud, num_points, require_idx=False):
|
||||
N = point_cloud.shape[0]
|
||||
mask = np.zeros(N, dtype=bool)
|
||||
|
||||
sampled_indices = np.zeros(num_points, dtype=int)
|
||||
sampled_indices[0] = np.random.randint(0, N)
|
||||
distances = np.linalg.norm(point_cloud - point_cloud[sampled_indices[0]], axis=1)
|
||||
for i in range(1, num_points):
|
||||
farthest_index = np.argmax(distances)
|
||||
sampled_indices[i] = farthest_index
|
||||
mask[farthest_index] = True
|
||||
|
||||
new_distances = np.linalg.norm(point_cloud - point_cloud[farthest_index], axis=1)
|
||||
distances = np.minimum(distances, new_distances)
|
||||
|
||||
sampled_points = point_cloud[sampled_indices]
|
||||
if require_idx:
|
||||
return sampled_points, sampled_indices
|
||||
return sampled_points
|
||||
|
||||
@staticmethod
|
||||
def random_downsample_point_cloud_tensor(point_cloud, num_points):
|
||||
idx = torch.randint(0, len(point_cloud), (num_points,))
|
||||
return point_cloud[idx]
|
||||
|
||||
@staticmethod
|
||||
def voxelize_points(points, voxel_size):
|
||||
voxel_indices = np.floor(points / voxel_size).astype(np.int32)
|
||||
unique_voxels = np.unique(voxel_indices, axis=0, return_inverse=True)
|
||||
return unique_voxels
|
||||
|
||||
@staticmethod
|
||||
def transform_point_cloud(points, pose_mat):
|
||||
points_h = np.concatenate([points, np.ones((points.shape[0], 1))], axis=1)
|
||||
points_h = np.dot(pose_mat, points_h.T).T
|
||||
return points_h[:, :3]
|
||||
|
||||
@staticmethod
|
||||
def get_overlapping_points(point_cloud_L, point_cloud_R, voxel_size=0.005, require_idx=False):
|
||||
voxels_L, indices_L = PtsUtil.voxelize_points(point_cloud_L, voxel_size)
|
||||
voxels_R, _ = PtsUtil.voxelize_points(point_cloud_R, voxel_size)
|
||||
|
||||
voxel_indices_L = voxels_L.view([("", voxels_L.dtype)] * 3)
|
||||
voxel_indices_R = voxels_R.view([("", voxels_R.dtype)] * 3)
|
||||
overlapping_voxels = np.intersect1d(voxel_indices_L, voxel_indices_R)
|
||||
mask_L = np.isin(
|
||||
indices_L, np.where(np.isin(voxel_indices_L, overlapping_voxels))[0]
|
||||
)
|
||||
overlapping_points = point_cloud_L[mask_L]
|
||||
if require_idx:
|
||||
return overlapping_points, mask_L
|
||||
return overlapping_points
|
||||
|
||||
@staticmethod
|
||||
def filter_points(points, normals, cam_pose, theta_limit=45, z_range=(0.2, 0.45)):
|
||||
|
||||
""" filter with normal """
|
||||
normals_normalized = normals / np.linalg.norm(normals, axis=1, keepdims=True)
|
||||
cos_theta = np.dot(normals_normalized, np.array([0, 0, 1]))
|
||||
theta = np.arccos(cos_theta) * 180 / np.pi
|
||||
idx = theta < theta_limit
|
||||
filtered_sampled_points = points[idx]
|
||||
filtered_normals = normals[idx]
|
||||
|
||||
""" filter with z range """
|
||||
points_cam = PtsUtil.transform_point_cloud(filtered_sampled_points, np.linalg.inv(cam_pose))
|
||||
idx = (points_cam[:, 2] > z_range[0]) & (points_cam[:, 2] < z_range[1])
|
||||
z_filtered_points = filtered_sampled_points[idx]
|
||||
z_filtered_normals = filtered_normals[idx]
|
||||
return z_filtered_points[:, :3], z_filtered_normals
|
||||
|
||||
@staticmethod
|
||||
def point_to_hash(point, voxel_size):
|
||||
return tuple(np.floor(point / voxel_size).astype(int))
|
||||
|
Loading…
x
Reference in New Issue
Block a user