5 Commits

Author SHA1 Message Date
81bf2678ac ablation study 2025-04-28 06:16:03 +00:00
ad7a1c9cdf solve merge 2025-04-11 20:10:56 +00:00
7c7f071f95 update 2025-04-12 04:05:51 +08:00
1a0e3c8042 sim control 2025-04-09 15:17:24 +08:00
be835aded4 finish partial_global inference 2024-11-26 15:40:00 +08:00
15 changed files with 834 additions and 74 deletions

View File

@@ -5,5 +5,7 @@ from runners.simulator import Simulator
class SimulateApp: class SimulateApp:
@staticmethod @staticmethod
def start(): def start():
Simulator("configs/server/server_split_dataset_config.yaml").run() simulator = Simulator("configs/local/simulation_config.yaml")
simulator.run("create")
simulator.run("simulate")

View File

@@ -70,7 +70,7 @@ module:
global_feat: True global_feat: True
feature_transform: False feature_transform: False
transformer_seq_encoder: transformer_seq_encoder:
embed_dim: 256 embed_dim: 320
num_heads: 4 num_heads: 4
ffn_dim: 256 ffn_dim: 256
num_layers: 3 num_layers: 3

View File

@@ -1,4 +1,3 @@
runner: runner:
general: general:
seed: 0 seed: 0
@@ -11,4 +10,27 @@ runner:
simulation: simulation:
robot: robot:
displaytable: urdf_path: "assets/franka_panda/panda.urdf"
initial_position: [0, 0, 0] # 机械臂基座位置
initial_orientation: [0, 0, 0] # 机械臂基座朝向(欧拉角)
turntable:
radius: 0.3 # 转盘半径(米)
height: 0.1 # 转盘高度
center_position: [0.8, 0, 0.4]
target:
obj_dir: /media/hofee/data/project/python/nbv_reconstruction/nbv_reconstruction/assets/object_meshes
obj_name: "google_scan-box_0185"
scale: 1.0 # 缩放系数
mass: 0.1 # 质量(kg)
rgba_color: [0.8, 0.8, 0.8, 1.0] # 目标物体颜色
camera:
width: 640
height: 480
fov: 40
near: 0.01
far: 5.0
displaytable:

View File

@@ -17,7 +17,7 @@ runner:
plane_size: 10 plane_size: 10
max_views: 512 max_views: 512
min_views: 128 min_views: 128
random_view_ratio: 0.02 random_view_ratio: 0.002
min_cam_table_included_degree: 20 min_cam_table_included_degree: 20
max_diag: 0.7 max_diag: 0.7
min_diag: 0.01 min_diag: 0.01

View File

@@ -3,11 +3,11 @@ runner:
general: general:
seed: 0 seed: 0
device: cuda device: cuda
cuda_visible_devices: "0" cuda_visible_devices: "2"
parallel: False parallel: False
experiment: experiment:
name: train_ab_global_only_with_wp_p++_strong name: newtrain_real_global_only
root_dir: "experiments" root_dir: "experiments"
use_checkpoint: False use_checkpoint: False
epoch: -1 # -1 stands for last epoch epoch: -1 # -1 stands for last epoch
@@ -28,18 +28,18 @@ runner:
- OmniObject3d_test - OmniObject3d_test
- OmniObject3d_val - OmniObject3d_val
pipeline: nbv_reconstruction_pipeline pipeline: nbv_reconstruction_pipeline_global_only
dataset: dataset:
OmniObject3d_train: OmniObject3d_train:
root_dir: "/data/hofee/data/new_full_data" root_dir: "/data/hofee/data/new_full_data"
model_dir: "../data/scaled_object_meshes" model_dir: "../data/scaled_object_meshes"
source: nbv_reconstruction_dataset source: nbv_reconstruction_dataset
split_file: "/data/hofee/data/new_full_data_list/OmniObject3d_train.txt" split_file: "/data/hofee/data/new_full_data_list/new_OmniObject3d_train.txt"
type: train type: train
cache: True cache: True
ratio: 1 ratio: 1
batch_size: 64 batch_size: 24
num_workers: 128 num_workers: 128
pts_num: 8192 pts_num: 8192
load_from_preprocess: True load_from_preprocess: True
@@ -48,14 +48,14 @@ dataset:
root_dir: "/data/hofee/data/new_full_data" root_dir: "/data/hofee/data/new_full_data"
model_dir: "../data/scaled_object_meshes" model_dir: "../data/scaled_object_meshes"
source: nbv_reconstruction_dataset source: nbv_reconstruction_dataset
split_file: "/data/hofee/data/new_full_data_list/OmniObject3d_test.txt" split_file: "/data/hofee/data/new_full_data_list/new_OmniObject3d_test.txt"
type: test type: test
cache: True cache: True
filter_degree: 75 filter_degree: 75
eval_list: eval_list:
- pose_diff - pose_diff
ratio: 1 ratio: 1
batch_size: 80 batch_size: 32
num_workers: 12 num_workers: 12
pts_num: 8192 pts_num: 8192
load_from_preprocess: True load_from_preprocess: True
@@ -64,21 +64,37 @@ dataset:
root_dir: "/data/hofee/data/new_full_data" root_dir: "/data/hofee/data/new_full_data"
model_dir: "../data/scaled_object_meshes" model_dir: "../data/scaled_object_meshes"
source: nbv_reconstruction_dataset source: nbv_reconstruction_dataset
split_file: "/data/hofee/data/new_full_data_list/OmniObject3d_train.txt" split_file: "/data/hofee/data/new_full_data_list/new_OmniObject3d_train.txt"
type: test type: test
cache: True cache: True
filter_degree: 75 filter_degree: 75
eval_list: eval_list:
- pose_diff - pose_diff
ratio: 0.1 ratio: 0.1
batch_size: 80 batch_size: 32
num_workers: 12 num_workers: 12
pts_num: 8192 pts_num: 8192
load_from_preprocess: True load_from_preprocess: True
pipeline: pipeline:
nbv_reconstruction_pipeline: nbv_reconstruction_pipeline_local:
modules:
pts_encoder: pointnet++_encoder
seq_encoder: transformer_seq_encoder
pose_encoder: pose_encoder
view_finder: gf_view_finder
eps: 1e-5
global_scanned_feat: True
nbv_reconstruction_pipeline_global:
modules:
pts_encoder: pointnet++_encoder
seq_encoder: transformer_seq_encoder
pose_encoder: pose_encoder
view_finder: gf_view_finder
eps: 1e-5
global_scanned_feat: True
nbv_reconstruction_pipeline_local_only:
modules: modules:
pts_encoder: pointnet++_encoder pts_encoder: pointnet++_encoder
seq_encoder: transformer_seq_encoder seq_encoder: transformer_seq_encoder
@@ -98,10 +114,9 @@ module:
pointnet++_encoder: pointnet++_encoder:
in_dim: 3 in_dim: 3
params_name: strong
transformer_seq_encoder: transformer_seq_encoder:
embed_dim: 256 embed_dim: 1280
num_heads: 4 num_heads: 4
ffn_dim: 256 ffn_dim: 256
num_layers: 3 num_layers: 3
@@ -110,7 +125,7 @@ module:
gf_view_finder: gf_view_finder:
t_feat_dim: 128 t_feat_dim: 128
pose_feat_dim: 256 pose_feat_dim: 256
main_feat_dim: 5120 main_feat_dim: 1024
regression_head: Rx_Ry_and_T regression_head: Rx_Ry_and_T
pose_mode: rot_matrix pose_mode: rot_matrix
per_point_feature: False per_point_feature: False

View File

@@ -0,0 +1,81 @@
import torch
from torch import nn
import PytorchBoot.namespace as namespace
import PytorchBoot.stereotype as stereotype
from PytorchBoot.factory.component_factory import ComponentFactory
from PytorchBoot.utils import Log
@stereotype.pipeline("nbv_reconstruction_pipeline_global_only")
class NBVReconstructionGlobalPointsOnlyPipeline(nn.Module):
def __init__(self, config):
super(NBVReconstructionGlobalPointsOnlyPipeline, self).__init__()
self.config = config
self.module_config = config["modules"]
self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"])
self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"])
self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"])
self.eps = float(self.config["eps"])
self.enable_global_scanned_feat = self.config["global_scanned_feat"]
def forward(self, data):
mode = data["mode"]
if mode == namespace.Mode.TRAIN:
return self.forward_train(data)
elif mode == namespace.Mode.TEST:
return self.forward_test(data)
else:
Log.error("Unknown mode: {}".format(mode), True)
def pertube_data(self, gt_delta_9d):
bs = gt_delta_9d.shape[0]
random_t = torch.rand(bs, device=gt_delta_9d.device) * (1. - self.eps) + self.eps
random_t = random_t.unsqueeze(-1)
mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t)
std = std.view(-1, 1)
z = torch.randn_like(gt_delta_9d)
perturbed_x = mu + z * std
target_score = - z * std / (std ** 2)
return perturbed_x, random_t, target_score, std
def forward_train(self, data):
main_feat = self.get_main_feat(data)
''' get std '''
best_to_world_pose_9d_batch = data["best_to_world_pose_9d"]
perturbed_x, random_t, target_score, std = self.pertube_data(best_to_world_pose_9d_batch)
input_data = {
"sampled_pose": perturbed_x,
"t": random_t,
"main_feat": main_feat,
}
estimated_score = self.view_finder(input_data)
output = {
"estimated_score": estimated_score,
"target_score": target_score,
"std": std
}
return output
def forward_test(self,data):
main_feat = self.get_main_feat(data)
estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view(main_feat)
result = {
"pred_pose_9d": estimated_delta_rot_9d,
"in_process_sample": in_process_sample
}
return result
def get_main_feat(self, data):
combined_scanned_pts_batch = data['combined_scanned_pts']
global_scanned_feat = self.pts_encoder.encode_points(combined_scanned_pts_batch)
main_feat = global_scanned_feat
if torch.isnan(main_feat).any():
Log.error("nan in main_feat", True)
return main_feat

View File

@@ -0,0 +1,91 @@
import torch
from torch import nn
import PytorchBoot.namespace as namespace
import PytorchBoot.stereotype as stereotype
from PytorchBoot.factory.component_factory import ComponentFactory
from PytorchBoot.utils import Log
@stereotype.pipeline("nbv_reconstruction_pipeline_local_only")
class NBVReconstructionLocalPointsOnlyPipeline(nn.Module):
def __init__(self, config):
super(NBVReconstructionLocalPointsOnlyPipeline, self).__init__()
self.config = config
self.module_config = config["modules"]
self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"])
self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"])
self.seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["seq_encoder"])
self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"])
self.eps = float(self.config["eps"])
self.enable_global_scanned_feat = self.config["global_scanned_feat"]
def forward(self, data):
mode = data["mode"]
if mode == namespace.Mode.TRAIN:
return self.forward_train(data)
elif mode == namespace.Mode.TEST:
return self.forward_test(data)
else:
Log.error("Unknown mode: {}".format(mode), True)
def pertube_data(self, gt_delta_9d):
bs = gt_delta_9d.shape[0]
random_t = torch.rand(bs, device=gt_delta_9d.device) * (1. - self.eps) + self.eps
random_t = random_t.unsqueeze(-1)
mu, std = self.view_finder.marginal_prob(gt_delta_9d, random_t)
std = std.view(-1, 1)
z = torch.randn_like(gt_delta_9d)
perturbed_x = mu + z * std
target_score = - z * std / (std ** 2)
return perturbed_x, random_t, target_score, std
def forward_train(self, data):
main_feat = self.get_main_feat(data)
''' get std '''
best_to_world_pose_9d_batch = data["best_to_world_pose_9d"]
perturbed_x, random_t, target_score, std = self.pertube_data(best_to_world_pose_9d_batch)
input_data = {
"sampled_pose": perturbed_x,
"t": random_t,
"main_feat": main_feat,
}
estimated_score = self.view_finder(input_data)
output = {
"estimated_score": estimated_score,
"target_score": target_score,
"std": std
}
return output
def forward_test(self,data):
main_feat = self.get_main_feat(data)
estimated_delta_rot_9d, in_process_sample = self.view_finder.next_best_view(main_feat)
result = {
"pred_pose_9d": estimated_delta_rot_9d,
"in_process_sample": in_process_sample
}
return result
def get_main_feat(self, data):
scanned_pts_batch = data['scanned_pts']
scanned_n_to_world_pose_9d_batch = data['scanned_n_to_world_pose_9d']
device = next(self.parameters()).device
feat_seq_list = []
for scanned_pts,scanned_n_to_world_pose_9d in zip(scanned_pts_batch,scanned_n_to_world_pose_9d_batch):
scanned_pts = scanned_pts.to(device)
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device)
pts_feat = self.pts_encoder.encode_points(scanned_pts)
pose_feat = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d)
seq_feat = torch.cat([pts_feat, pose_feat], dim=-1)
feat_seq_list.append(seq_feat)
main_feat = self.seq_encoder.encode_sequence(feat_seq_list)
if torch.isnan(main_feat).any():
Log.error("nan in main_feat", True)
return main_feat

View File

@@ -6,7 +6,7 @@ from PytorchBoot.factory.component_factory import ComponentFactory
from PytorchBoot.utils import Log from PytorchBoot.utils import Log
@stereotype.pipeline("nbv_reconstruction_global_pts_pipeline") @stereotype.pipeline("nbv_reconstruction_pipeline_global")
class NBVReconstructionGlobalPointsPipeline(nn.Module): class NBVReconstructionGlobalPointsPipeline(nn.Module):
def __init__(self, config): def __init__(self, config):
super(NBVReconstructionGlobalPointsPipeline, self).__init__() super(NBVReconstructionGlobalPointsPipeline, self).__init__()
@@ -14,7 +14,7 @@ class NBVReconstructionGlobalPointsPipeline(nn.Module):
self.module_config = config["modules"] self.module_config = config["modules"]
self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"]) self.pts_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pts_encoder"])
self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"]) self.pose_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_encoder"])
self.pose_seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["pose_seq_encoder"]) self.seq_encoder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["seq_encoder"])
self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"]) self.view_finder = ComponentFactory.create(namespace.Stereotype.MODULE, self.module_config["view_finder"])
self.eps = float(self.config["eps"]) self.eps = float(self.config["eps"])
self.enable_global_scanned_feat = self.config["global_scanned_feat"] self.enable_global_scanned_feat = self.config["global_scanned_feat"]
@@ -73,13 +73,13 @@ class NBVReconstructionGlobalPointsPipeline(nn.Module):
device = next(self.parameters()).device device = next(self.parameters()).device
pose_feat_seq_list = [] feat_seq_list = []
for scanned_n_to_world_pose_9d in scanned_n_to_world_pose_9d_batch: for scanned_n_to_world_pose_9d in scanned_n_to_world_pose_9d_batch:
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device) scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device)
pose_feat_seq_list.append(self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d)) feat_seq_list.append(self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d))
main_feat = self.pose_seq_encoder.encode_sequence(pose_feat_seq_list) main_feat = self.seq_encoder.encode_sequence(feat_seq_list)
combined_scanned_pts_batch = data['combined_scanned_pts'] combined_scanned_pts_batch = data['combined_scanned_pts']

View File

@@ -5,7 +5,7 @@ import PytorchBoot.stereotype as stereotype
from PytorchBoot.factory.component_factory import ComponentFactory from PytorchBoot.factory.component_factory import ComponentFactory
from PytorchBoot.utils import Log from PytorchBoot.utils import Log
@stereotype.pipeline("nbv_reconstruction_local_pts_pipeline") @stereotype.pipeline("nbv_reconstruction_pipeline_local")
class NBVReconstructionLocalPointsPipeline(nn.Module): class NBVReconstructionLocalPointsPipeline(nn.Module):
def __init__(self, config): def __init__(self, config):
super(NBVReconstructionLocalPointsPipeline, self).__init__() super(NBVReconstructionLocalPointsPipeline, self).__init__()
@@ -70,23 +70,18 @@ class NBVReconstructionLocalPointsPipeline(nn.Module):
def get_main_feat(self, data): def get_main_feat(self, data):
scanned_pts_batch = data['scanned_pts'] scanned_pts_batch = data['scanned_pts']
scanned_n_to_world_pose_9d_batch = data['scanned_n_to_world_pose_9d'] scanned_n_to_world_pose_9d_batch = data['scanned_n_to_world_pose_9d']
device = next(self.parameters()).device device = next(self.parameters()).device
feat_seq_list = []
pts_feat_seq_list = []
pose_feat_seq_list = []
for scanned_pts,scanned_n_to_world_pose_9d in zip(scanned_pts_batch,scanned_n_to_world_pose_9d_batch): for scanned_pts,scanned_n_to_world_pose_9d in zip(scanned_pts_batch,scanned_n_to_world_pose_9d_batch):
scanned_pts = scanned_pts.to(device) scanned_pts = scanned_pts.to(device)
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device) scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device)
pts_feat_seq_list.append(self.pts_encoder.encode_points(scanned_pts)) pts_feat = self.pts_encoder.encode_points(scanned_pts)
pose_feat_seq_list.append(self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d)) pose_feat = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d)
seq_feat = torch.cat([pts_feat, pose_feat], dim=-1)
main_feat = self.seq_encoder.encode_sequence(pts_feat_seq_list, pose_feat_seq_list) feat_seq_list.append(seq_feat)
main_feat = self.seq_encoder.encode_sequence(feat_seq_list)
if self.enable_global_scanned_feat: if self.enable_global_scanned_feat:
combined_scanned_pts_batch = data['combined_scanned_pts'] combined_scanned_pts_batch = data['combined_scanned_pts']

View File

@@ -4,10 +4,10 @@ import PytorchBoot.namespace as namespace
import PytorchBoot.stereotype as stereotype import PytorchBoot.stereotype as stereotype
from PytorchBoot.config import ConfigManager from PytorchBoot.config import ConfigManager
from PytorchBoot.utils.log_util import Log from PytorchBoot.utils.log_util import Log
import torch import torch
import os import os
import sys import sys
import time
sys.path.append(r"/data/hofee/project/nbv_rec/nbv_reconstruction") sys.path.append(r"/data/hofee/project/nbv_rec/nbv_reconstruction")
@@ -51,7 +51,7 @@ class NBVReconstructionDataset(BaseDataset):
scene_name_list.append(scene_name) scene_name_list.append(scene_name)
return scene_name_list return scene_name_list
def get_datalist(self, bias=False): def get_datalist(self):
datalist = [] datalist = []
for scene_name in self.scene_name_list: for scene_name in self.scene_name_list:
seq_num = DataLoadUtil.get_label_num(self.root_dir, scene_name) seq_num = DataLoadUtil.get_label_num(self.root_dir, scene_name)
@@ -80,18 +80,16 @@ class NBVReconstructionDataset(BaseDataset):
for data_pair in label_data["data_pairs"]: for data_pair in label_data["data_pairs"]:
scanned_views = data_pair[0] scanned_views = data_pair[0]
next_best_view = data_pair[1] next_best_view = data_pair[1]
accept_probability = scanned_views[-1][1] datalist.append(
if accept_probability > np.random.rand(): {
datalist.append( "scanned_views": scanned_views,
{ "next_best_view": next_best_view,
"scanned_views": scanned_views, "seq_max_coverage_rate": max_coverage_rate,
"next_best_view": next_best_view, "scene_name": scene_name,
"seq_max_coverage_rate": max_coverage_rate, "label_idx": seq_idx,
"scene_name": scene_name, "scene_max_coverage_rate": scene_max_coverage_rate,
"label_idx": seq_idx, }
"scene_max_coverage_rate": scene_max_coverage_rate, )
}
)
return datalist return datalist
def preprocess_cache(self): def preprocess_cache(self):
@@ -117,8 +115,13 @@ class NBVReconstructionDataset(BaseDataset):
except Exception as e: except Exception as e:
Log.error(f"Save cache failed: {e}") Log.error(f"Save cache failed: {e}")
def voxel_downsample_with_mask(self, pts, voxel_size): def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003):
pass voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
idx_sort = np.argsort(inverse)
idx_unique = idx_sort[np.cumsum(counts)-counts]
downsampled_points = point_cloud[idx_unique]
return downsampled_points, inverse
def __getitem__(self, index): def __getitem__(self, index):
@@ -132,6 +135,9 @@ class NBVReconstructionDataset(BaseDataset):
scanned_coverages_rate, scanned_coverages_rate,
scanned_n_to_world_pose, scanned_n_to_world_pose,
) = ([], [], []) ) = ([], [], [])
#start_time = time.time()
start_indices = [0]
total_points = 0
for view in scanned_views: for view in scanned_views:
frame_idx = view[0] frame_idx = view[0]
coverage_rate = view[1] coverage_rate = view[1]
@@ -153,8 +159,12 @@ class NBVReconstructionDataset(BaseDataset):
n_to_world_trans = n_to_world_pose[:3, 3] n_to_world_trans = n_to_world_pose[:3, 3]
n_to_world_9d = np.concatenate([n_to_world_6d, n_to_world_trans], axis=0) n_to_world_9d = np.concatenate([n_to_world_6d, n_to_world_trans], axis=0)
scanned_n_to_world_pose.append(n_to_world_9d) scanned_n_to_world_pose.append(n_to_world_9d)
total_points += len(downsampled_target_point_cloud)
start_indices.append(total_points)
#end_time = time.time()
#Log.info(f"load data time: {end_time - start_time}")
nbv_idx, nbv_coverage_rate = nbv[0], nbv[1] nbv_idx, nbv_coverage_rate = nbv[0], nbv[1]
nbv_path = DataLoadUtil.get_path(self.root_dir, scene_name, nbv_idx) nbv_path = DataLoadUtil.get_path(self.root_dir, scene_name, nbv_idx)
cam_info = DataLoadUtil.load_cam_info(nbv_path) cam_info = DataLoadUtil.load_cam_info(nbv_path)
@@ -167,14 +177,27 @@ class NBVReconstructionDataset(BaseDataset):
best_to_world_9d = np.concatenate( best_to_world_9d = np.concatenate(
[best_to_world_6d, best_to_world_trans], axis=0 [best_to_world_6d, best_to_world_trans], axis=0
) )
combined_scanned_views_pts = np.concatenate(scanned_views_pts, axis=0)
voxel_downsampled_combined_scanned_pts_np = PtsUtil.voxel_downsample_point_cloud(combined_scanned_views_pts, 0.002)
random_downsampled_combined_scanned_pts_np = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, self.pts_num)
combined_scanned_views_pts = np.concatenate(scanned_views_pts, axis=0)
voxel_downsampled_combined_scanned_pts_np, inverse = self.voxel_downsample_with_mapping(combined_scanned_views_pts, 0.003)
random_downsampled_combined_scanned_pts_np, random_downsample_idx = PtsUtil.random_downsample_point_cloud(voxel_downsampled_combined_scanned_pts_np, self.pts_num, require_idx=True)
# all_idx_unique = np.arange(len(voxel_downsampled_combined_scanned_pts_np))
# all_random_downsample_idx = all_idx_unique[random_downsample_idx]
# scanned_pts_mask = []
# for idx, start_idx in enumerate(start_indices):
# if idx == len(start_indices) - 1:
# break
# end_idx = start_indices[idx+1]
# view_inverse = inverse[start_idx:end_idx]
# view_unique_downsampled_idx = np.unique(view_inverse)
# view_unique_downsampled_idx_set = set(view_unique_downsampled_idx)
# mask = np.array([idx in view_unique_downsampled_idx_set for idx in all_random_downsample_idx])
# #scanned_pts_mask.append(mask)
data_item = { data_item = {
"scanned_pts": np.asarray(scanned_views_pts, dtype=np.float32), # Ndarray(S x Nv x 3) "scanned_pts": np.asarray(scanned_views_pts, dtype=np.float32), # Ndarray(S x Nv x 3)
"combined_scanned_pts": np.asarray(random_downsampled_combined_scanned_pts_np, dtype=np.float32), # Ndarray(N x 3) "combined_scanned_pts": np.asarray(random_downsampled_combined_scanned_pts_np, dtype=np.float32), # Ndarray(N x 3)
#"scanned_pts_mask": np.asarray(scanned_pts_mask, dtype=np.bool), # Ndarray(N)
"scanned_coverage_rate": scanned_coverages_rate, # List(S): Float, range(0, 1) "scanned_coverage_rate": scanned_coverages_rate, # List(S): Float, range(0, 1)
"scanned_n_to_world_pose_9d": np.asarray(scanned_n_to_world_pose, dtype=np.float32), # Ndarray(S x 9) "scanned_n_to_world_pose_9d": np.asarray(scanned_n_to_world_pose, dtype=np.float32), # Ndarray(S x 9)
"best_coverage_rate": nbv_coverage_rate, # Float, range(0, 1) "best_coverage_rate": nbv_coverage_rate, # Float, range(0, 1)
@@ -200,7 +223,9 @@ class NBVReconstructionDataset(BaseDataset):
collate_data["scanned_n_to_world_pose_9d"] = [ collate_data["scanned_n_to_world_pose_9d"] = [
torch.tensor(item["scanned_n_to_world_pose_9d"]) for item in batch torch.tensor(item["scanned_n_to_world_pose_9d"]) for item in batch
] ]
# collate_data["scanned_pts_mask"] = [
# torch.tensor(item["scanned_pts_mask"]) for item in batch
# ]
''' ------ Fixed Length ------ ''' ''' ------ Fixed Length ------ '''
collate_data["best_to_world_pose_9d"] = torch.stack( collate_data["best_to_world_pose_9d"] = torch.stack(
@@ -209,12 +234,14 @@ class NBVReconstructionDataset(BaseDataset):
collate_data["combined_scanned_pts"] = torch.stack( collate_data["combined_scanned_pts"] = torch.stack(
[torch.tensor(item["combined_scanned_pts"]) for item in batch] [torch.tensor(item["combined_scanned_pts"]) for item in batch]
) )
for key in batch[0].keys(): for key in batch[0].keys():
if key not in [ if key not in [
"scanned_pts", "scanned_pts",
"scanned_n_to_world_pose_9d", "scanned_n_to_world_pose_9d",
"best_to_world_pose_9d", "best_to_world_pose_9d",
"combined_scanned_pts", "combined_scanned_pts",
"scanned_pts_mask",
]: ]:
collate_data[key] = [item[key] for item in batch] collate_data[key] = [item[key] for item in batch]
return collate_data return collate_data
@@ -230,10 +257,9 @@ if __name__ == "__main__":
torch.manual_seed(seed) torch.manual_seed(seed)
np.random.seed(seed) np.random.seed(seed)
config = { config = {
"root_dir": "/data/hofee/data/new_full_data", "root_dir": "/data/hofee/nbv_rec_part2_preprocessed",
"model_dir": "../data/scaled_object_meshes",
"source": "nbv_reconstruction_dataset", "source": "nbv_reconstruction_dataset",
"split_file": "/data/hofee/data/new_full_data_list/OmniObject3d_train.txt", "split_file": "/data/hofee/data/sample.txt",
"load_from_preprocess": True, "load_from_preprocess": True,
"ratio": 0.5, "ratio": 0.5,
"batch_size": 2, "batch_size": 2,

View File

@@ -90,26 +90,51 @@ class NBVReconstructionPipeline(nn.Module):
scanned_n_to_world_pose_9d_batch = data[ scanned_n_to_world_pose_9d_batch = data[
"scanned_n_to_world_pose_9d" "scanned_n_to_world_pose_9d"
] # List(B): Tensor(S x 9) ] # List(B): Tensor(S x 9)
scanned_pts_mask_batch = data["scanned_pts_mask"] # List(B): Tensor(S x N)
scanned_pts_mask_batch = data["scanned_pts_mask"] # List(B): Tensor(N)
device = next(self.parameters()).device device = next(self.parameters()).device
embedding_list_batch = [] embedding_list_batch = []
combined_scanned_pts_batch = data["combined_scanned_pts"] # Tensor(B x N x 3) combined_scanned_pts_batch = data["combined_scanned_pts"] # Tensor(B x N x 3)
global_scanned_feat = self.pts_encoder.encode_points( global_scanned_feat, per_point_feat_batch = self.pts_encoder.encode_points(
combined_scanned_pts_batch, require_per_point_feat=False combined_scanned_pts_batch, require_per_point_feat=True
) # global_scanned_feat: Tensor(B x Dg) ) # global_scanned_feat: Tensor(B x Dg)
batch_size = len(scanned_n_to_world_pose_9d_batch)
for scanned_n_to_world_pose_9d in scanned_n_to_world_pose_9d_batch: for i in range(batch_size):
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d.to(device) # Tensor(S x 9) seq_len = len(scanned_n_to_world_pose_9d_batch[i])
scanned_n_to_world_pose_9d = scanned_n_to_world_pose_9d_batch[i].to(device) # Tensor(S x 9)
scanned_pts_mask = scanned_pts_mask_batch[i] # Tensor(S x N)
per_point_feat = per_point_feat_batch[i] # Tensor(N x Dp)
partial_point_feat_seq = []
for j in range(seq_len):
partial_per_point_feat = per_point_feat[scanned_pts_mask[j]]
if partial_per_point_feat.shape[0] == 0:
partial_point_feat = torch.zeros(per_point_feat.shape[1], device=device)
else:
partial_point_feat = torch.mean(partial_per_point_feat, dim=0) # Tensor(Dp)
partial_point_feat_seq.append(partial_point_feat)
partial_point_feat_seq = torch.stack(partial_point_feat_seq, dim=0) # Tensor(S x Dp)
pose_feat_seq = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d) # Tensor(S x Dp) pose_feat_seq = self.pose_encoder.encode_pose(scanned_n_to_world_pose_9d) # Tensor(S x Dp)
seq_embedding = pose_feat_seq
seq_embedding = torch.cat([partial_point_feat_seq, pose_feat_seq], dim=-1)
embedding_list_batch.append(seq_embedding) # List(B): Tensor(S x (Dp)) embedding_list_batch.append(seq_embedding) # List(B): Tensor(S x (Dp))
seq_feat = self.seq_encoder.encode_sequence(embedding_list_batch) # Tensor(B x Ds) seq_feat = self.seq_encoder.encode_sequence(embedding_list_batch) # Tensor(B x Ds)
main_feat = torch.cat([seq_feat, global_scanned_feat], dim=-1) # Tensor(B x (Ds+Dg)) main_feat = torch.cat([seq_feat, global_scanned_feat], dim=-1) # Tensor(B x (Ds+Dg))
if torch.isnan(main_feat).any(): if torch.isnan(main_feat).any():
for i in range(len(main_feat)):
if torch.isnan(main_feat[i]).any():
scanned_pts_mask = scanned_pts_mask_batch[i]
Log.info(f"scanned_pts_mask shape: {scanned_pts_mask.shape}")
Log.info(f"scanned_pts_mask sum: {scanned_pts_mask.sum()}")
import ipdb
ipdb.set_trace()
Log.error("nan in main_feat", True) Log.error("nan in main_feat", True)
return main_feat return main_feat

View File

@@ -92,7 +92,8 @@ class Inferencer(Runner):
output = self.predict_sequence(data) output = self.predict_sequence(data)
self.save_inference_result(test_set_name, data["scene_name"], output) self.save_inference_result(test_set_name, data["scene_name"], output)
except Exception as e: except Exception as e:
Log.error(f"Error in scene {scene_name}, {e}") print(e)
Log.error(f"Error, {e}")
continue continue
status_manager.set_progress("inference", "inferencer", f"dataset", len(self.test_set_list), len(self.test_set_list)) status_manager.set_progress("inference", "inferencer", f"dataset", len(self.test_set_list), len(self.test_set_list))
@@ -116,7 +117,9 @@ class Inferencer(Runner):
''' data for inference ''' ''' data for inference '''
input_data = {} input_data = {}
input_data["combined_scanned_pts"] = torch.tensor(data["first_scanned_pts"][0], dtype=torch.float32).to(self.device).unsqueeze(0) input_data["combined_scanned_pts"] = torch.tensor(data["first_scanned_pts"][0], dtype=torch.float32).to(self.device).unsqueeze(0)
input_data["scanned_pts_mask"] = [torch.zeros(input_data["combined_scanned_pts"].shape[1], dtype=torch.bool).to(self.device).unsqueeze(0)]
input_data["scanned_n_to_world_pose_9d"] = [torch.tensor(data["first_scanned_n_to_world_pose_9d"], dtype=torch.float32).to(self.device)] input_data["scanned_n_to_world_pose_9d"] = [torch.tensor(data["first_scanned_n_to_world_pose_9d"], dtype=torch.float32).to(self.device)]
input_data["mode"] = namespace.Mode.TEST input_data["mode"] = namespace.Mode.TEST
input_pts_N = input_data["combined_scanned_pts"].shape[1] input_pts_N = input_data["combined_scanned_pts"].shape[1]
@@ -254,6 +257,14 @@ class Inferencer(Runner):
return result return result
def voxel_downsample_with_mapping(self, point_cloud, voxel_size=0.003):
voxel_indices = np.floor(point_cloud / voxel_size).astype(np.int32)
unique_voxels, inverse, counts = np.unique(voxel_indices, axis=0, return_inverse=True, return_counts=True)
idx_sort = np.argsort(inverse)
idx_unique = idx_sort[np.cumsum(counts)-counts]
downsampled_points = point_cloud[idx_unique]
return downsampled_points, inverse
def compute_coverage_rate(self, scanned_view_pts, new_pts, model_pts, threshold=0.005): def compute_coverage_rate(self, scanned_view_pts, new_pts, model_pts, threshold=0.005):
if new_pts is not None: if new_pts is not None:
new_scanned_view_pts = scanned_view_pts + [new_pts] new_scanned_view_pts = scanned_view_pts + [new_pts]

View File

@@ -1,23 +1,456 @@
# import pybullet as p
# import pybullet_data
import numpy as np
import os
import time
from PytorchBoot.runners.runner import Runner from PytorchBoot.runners.runner import Runner
import PytorchBoot.stereotype as stereotype import PytorchBoot.stereotype as stereotype
from PytorchBoot.config import ConfigManager
from utils.control import ControlUtil
@stereotype.runner("simulator") @stereotype.runner("simulator")
class Simulator(Runner): class Simulator(Runner):
CREATE: str = "create"
SIMULATE: str = "simulate"
INIT_GRIPPER_POSE:np.ndarray = np.asarray(
[[0.41869126 ,0.87596275 , 0.23951774 , 0.36005292],
[ 0.70787907 ,-0.4800251 , 0.51813998 ,-0.40499909],
[ 0.56884584, -0.04739109 ,-0.82107382 ,0.76881103],
[ 0. , 0. , 0. , 1. ]])
TURNTABLE_WORLD_TO_PYBULLET_WORLD:np.ndarray = np.asarray(
[[1, 0, 0, 0.8],
[0, 1, 0, 0],
[0, 0, 1, 0.5],
[0, 0, 0, 1]])
debug_pose = np.asarray([
[
0.992167055606842,
-0.10552699863910675,
0.06684812903404236,
-0.07388903945684433
],
[
0.10134342312812805,
0.3670985698699951,
-0.9246448874473572,
-0.41582486033439636
],
[
0.07303514331579208,
0.9241767525672913,
0.37491756677627563,
1.0754833221435547
],
[
0.0,
0.0,
0.0,
1.0
]])
def __init__(self, config_path): def __init__(self, config_path):
super().__init__(config_path) super().__init__(config_path)
self.config_path = config_path self.config_path = config_path
self.robot_id = None
self.turntable_id = None
self.target_id = None
camera_config = ConfigManager.get("simulation", "camera")
self.camera_params = {
'width': camera_config["width"],
'height': camera_config["height"],
'fov': camera_config["fov"],
'near': camera_config["near"],
'far': camera_config["far"]
}
self.sim_config = ConfigManager.get("simulation")
def run(self): def run(self, cmd):
print() print(f"Simulator run {cmd}")
if cmd == self.CREATE:
self.prepare_env()
self.create_env()
elif cmd == self.SIMULATE:
self.simulate()
def simulate(self):
self.reset()
self.init()
debug_pose = Simulator.debug_pose
offset = np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])
debug_pose = debug_pose @ offset
for _ in range(10000):
debug_pose_2 = np.eye(4)
debug_pose_2[0,0] = -1
debug_pose_2[2,3] = 0.5
self.move_to(debug_pose_2)
# Wait for the system to stabilize
for _ in range(20): # Simulate 20 steps to ensure stability
p.stepSimulation()
time.sleep(0.001) # Add small delay to ensure physics simulation
depth_img, segm_img = self.take_picture()
p.stepSimulation()
def prepare_env(self): def prepare_env(self):
pass p.connect(p.GUI)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setGravity(0, 0, 0)
p.loadURDF("plane.urdf")
def create_env(self): def create_env(self):
pass print(self.config)
robot_config = self.sim_config["robot"]
turntable_config = self.sim_config["turntable"]
target_config = self.sim_config["target"]
self.robot_id = p.loadURDF(
robot_config["urdf_path"],
robot_config["initial_position"],
p.getQuaternionFromEuler(robot_config["initial_orientation"]),
useFixedBase=True
)
p.changeDynamics(
self.robot_id,
linkIndex=-1,
mass=0,
linearDamping=0,
angularDamping=0,
lateralFriction=0
)
visual_shape_id = p.createVisualShape(
shapeType=p.GEOM_CYLINDER,
radius=turntable_config["radius"],
length=turntable_config["height"],
rgbaColor=[0.7, 0.7, 0.7, 1]
)
collision_shape_id = p.createCollisionShape(
shapeType=p.GEOM_CYLINDER,
radius=turntable_config["radius"],
height=turntable_config["height"]
)
self.turntable_id = p.createMultiBody(
baseMass=0, # 设置质量为0使其成为静态物体
baseCollisionShapeIndex=collision_shape_id,
baseVisualShapeIndex=visual_shape_id,
basePosition=turntable_config["center_position"]
)
# 禁用转盘的动力学
p.changeDynamics(
self.turntable_id,
-1, # -1 表示基座
mass=0,
linearDamping=0,
angularDamping=0,
lateralFriction=0
)
obj_path = os.path.join(target_config["obj_dir"], target_config["obj_name"], "mesh.obj")
assert os.path.exists(obj_path), f"Error: File not found at {obj_path}"
# 加载OBJ文件作为目标物体
target_visual = p.createVisualShape(
shapeType=p.GEOM_MESH,
fileName=obj_path,
rgbaColor=target_config["rgba_color"],
specularColor=[0.4, 0.4, 0.4],
meshScale=[target_config["scale"]] * 3
)
# 使用简化的碰撞形状
target_collision = p.createCollisionShape(
shapeType=p.GEOM_MESH,
fileName=obj_path,
meshScale=[target_config["scale"]] * 3,
flags=p.GEOM_FORCE_CONCAVE_TRIMESH # 尝试使用凹面网格
)
# 创建目标物体
self.target_id = p.createMultiBody(
baseMass=0, # 设置质量为0使其成为静态物体
baseCollisionShapeIndex=target_collision,
baseVisualShapeIndex=target_visual,
basePosition=[
turntable_config["center_position"][0],
turntable_config["center_position"][1],
turntable_config["height"] + turntable_config["center_position"][2]
],
baseOrientation=p.getQuaternionFromEuler([np.pi/2, 0, 0])
)
# 禁用目标物体的动力学
p.changeDynamics(
self.target_id,
-1, # -1 表示基座
mass=0,
linearDamping=0,
angularDamping=0,
lateralFriction=0
)
# 创建固定约束,将目标物体固定在转盘上
cid = p.createConstraint(
parentBodyUniqueId=self.turntable_id,
parentLinkIndex=-1, # -1 表示基座
childBodyUniqueId=self.target_id,
childLinkIndex=-1, # -1 表示基座
jointType=p.JOINT_FIXED,
jointAxis=[0, 0, 0],
parentFramePosition=[0, 0, 0], # 相对于转盘中心的偏移
childFramePosition=[0, 0, 0] # 相对于物体中心的偏移
)
# 设置约束参数
p.changeConstraint(cid, maxForce=100) # 设置最大力,确保约束稳定
def move_robot_to_pose(self, target_matrix):
# 从4x4齐次矩阵中提取位置前3个元素
position = target_matrix[:3, 3]
# 从3x3旋转矩阵中提取方向四元数
R = target_matrix[:3, :3]
# 计算四元数的w分量
w = np.sqrt(max(0, 1 + R[0,0] + R[1,1] + R[2,2])) / 2
# 避免除零错误,同时处理不同情况
if abs(w) < 1e-8:
# 当w接近0时的特殊情况
x = np.sqrt(max(0, 1 + R[0,0] - R[1,1] - R[2,2])) / 2
y = np.sqrt(max(0, 1 - R[0,0] + R[1,1] - R[2,2])) / 2
z = np.sqrt(max(0, 1 - R[0,0] - R[1,1] + R[2,2])) / 2
# 确定符号
if R[2,1] - R[1,2] < 0: x = -x
if R[0,2] - R[2,0] < 0: y = -y
if R[1,0] - R[0,1] < 0: z = -z
else:
# 正常情况
x = (R[2,1] - R[1,2]) / (4 * w)
y = (R[0,2] - R[2,0]) / (4 * w)
z = (R[1,0] - R[0,1]) / (4 * w)
orientation = (x, y, z, w)
# 设置IK求解参数
num_joints = p.getNumJoints(self.robot_id)
lower_limits = []
upper_limits = []
joint_ranges = []
rest_poses = []
# 获取关节限制和默认姿态
for i in range(num_joints):
joint_info = p.getJointInfo(self.robot_id, i)
lower_limits.append(joint_info[8])
upper_limits.append(joint_info[9])
joint_ranges.append(joint_info[9] - joint_info[8])
rest_poses.append(0) # 可以设置一个较好的默认姿态
# 使用增强版IK求解器考虑碰撞避障
joint_poses = p.calculateInverseKinematics(
self.robot_id,
7, # end effector link index
position,
orientation,
lowerLimits=lower_limits,
upperLimits=upper_limits,
jointRanges=joint_ranges,
restPoses=rest_poses,
maxNumIterations=100,
residualThreshold=1e-4
)
# 分步移动到目标位置,同时检查碰撞
current_poses = [p.getJointState(self.robot_id, i)[0] for i in range(7)]
steps = 50 # 分50步移动
for step in range(steps):
# 线性插值计算中间位置
intermediate_poses = []
for current, target in zip(current_poses, joint_poses):
t = (step + 1) / steps
intermediate = current + (target - current) * t
intermediate_poses.append(intermediate)
# 设置关节位置
for i in range(7):
p.setJointMotorControl2(
self.robot_id,
i,
p.POSITION_CONTROL,
intermediate_poses[i]
)
# 执行一步模拟
p.stepSimulation()
# 检查碰撞
if p.getContactPoints(self.robot_id, self.turntable_id):
print("检测到潜在碰撞,停止移动")
return False
return True
def rotate_turntable(self, angle_degrees):
# 旋转转盘
current_pos, current_orn = p.getBasePositionAndOrientation(self.turntable_id)
current_orn = p.getEulerFromQuaternion(current_orn)
new_orn = list(current_orn)
new_orn[2] += np.radians(angle_degrees)
new_orn_quat = p.getQuaternionFromEuler(new_orn)
p.resetBasePositionAndOrientation(
self.turntable_id,
current_pos,
new_orn_quat
)
# 同时旋转目标物体
target_pos, target_orn = p.getBasePositionAndOrientation(self.target_id)
target_orn = p.getEulerFromQuaternion(target_orn)
# 更新目标物体的方向
target_orn = list(target_orn)
target_orn[2] += np.radians(angle_degrees)
target_orn_quat = p.getQuaternionFromEuler(target_orn)
# 计算物体新的位置(绕转盘中心旋转)
turntable_center = current_pos
relative_pos = np.array(target_pos) - np.array(turntable_center)
# 创建旋转矩阵
theta = np.radians(angle_degrees)
rotation_matrix = np.array([
[np.cos(theta), -np.sin(theta), 0],
[np.sin(theta), np.cos(theta), 0],
[0, 0, 1]
])
# 计算新的相对位置
new_relative_pos = rotation_matrix.dot(relative_pos)
new_pos = np.array(turntable_center) + new_relative_pos
# 更新目标物体的位置和方向
p.resetBasePositionAndOrientation(
self.target_id,
new_pos,
target_orn_quat
)
def get_camera_pose(self):
end_effector_link = 7 # Franka末端执行器的链接索引
state = p.getLinkState(self.robot_id, end_effector_link)
ee_pos = state[0] # 世界坐标系中的位置
camera_orn = state[1] # 世界坐标系中的朝向(四元数)
# 计算相机的视角矩阵
rot_matrix = p.getMatrixFromQuaternion(camera_orn)
rot_matrix = np.array(rot_matrix).reshape(3, 3)
# 相机的前向向量与末端执行器的x轴对齐
camera_forward = rot_matrix.dot(np.array([0, 0, 1])) # x轴方向
# 将相机位置向前偏移0.1米
offset = 0.12
camera_pos = np.array(ee_pos) + camera_forward * offset
camera_target = camera_pos + camera_forward
# 相机的上向量与末端执行器的z轴对齐
camera_up = rot_matrix.dot(np.array([1, 0, 0])) # z轴方向
return camera_pos, camera_target, camera_up
def take_picture(self):
camera_pos, camera_target, camera_up = self.get_camera_pose()
view_matrix = p.computeViewMatrix(
cameraEyePosition=camera_pos,
cameraTargetPosition=camera_target,
cameraUpVector=camera_up
)
projection_matrix = p.computeProjectionMatrixFOV(
fov=self.camera_params['fov'],
aspect=self.camera_params['width'] / self.camera_params['height'],
nearVal=self.camera_params['near'],
farVal=self.camera_params['far']
)
_,_,rgb_img,depth_img,segm_img = p.getCameraImage(
width=self.camera_params['width'],
height=self.camera_params['height'],
viewMatrix=view_matrix,
projectionMatrix=projection_matrix,
renderer=p.ER_BULLET_HARDWARE_OPENGL
)
depth_img = self.camera_params['far'] * self.camera_params['near'] / (
self.camera_params['far'] - (self.camera_params['far'] - self.camera_params['near']) * depth_img)
depth_img = np.array(depth_img)
segm_img = np.array(segm_img)
return depth_img, segm_img
def reset(self):
target_pos = [0.5, 0, 1]
target_orn = p.getQuaternionFromEuler([np.pi, 0, 0])
target_matrix = np.eye(4)
target_matrix[:3, 3] = target_pos
target_matrix[:3, :3] = np.asarray(p.getMatrixFromQuaternion(target_orn)).reshape(3,3)
self.move_robot_to_pose(target_matrix)
def init(self):
self.move_to(Simulator.INIT_GRIPPER_POSE)
def move_to(self, pose: np.ndarray):
#delta_degree, min_new_cam_to_world = ControlUtil.solve_display_table_rot_and_cam_to_world(pose)
#print(delta_degree)
min_new_cam_to_pybullet_world = Simulator.TURNTABLE_WORLD_TO_PYBULLET_WORLD@pose
self.move_to_cam_pose(min_new_cam_to_pybullet_world)
#self.rotate_turntable(delta_degree)
def __del__(self):
p.disconnect()
def create_experiment(self, backup_name=None): def create_experiment(self, backup_name=None):
return super().create_experiment(backup_name) return super().create_experiment(backup_name)
def load_experiment(self, backup_name=None): def load_experiment(self, backup_name=None):
super().load_experiment(backup_name) super().load_experiment(backup_name)
def move_to_cam_pose(self, camera_pose: np.ndarray):
# 从相机位姿矩阵中提取位置和旋转矩阵
camera_pos = camera_pose[:3, 3]
R_camera = camera_pose[:3, :3]
# 相机的朝向向量z轴
forward = R_camera[:, 2]
# 由于相机与末端执行器之间有固定偏移,需要计算末端执行器位置
# 相机在末端执行器前方0.12米
gripper_pos = camera_pos - forward * 0.12
# 末端执行器的旋转矩阵需要考虑与相机坐标系的固定变换
# 假设相机的forward对应gripper的z轴相机的x轴对应gripper的x轴
R_gripper = R_camera
# 构建4x4齐次变换矩阵
gripper_pose = np.eye(4)
gripper_pose[:3, :3] = R_gripper
gripper_pose[:3, 3] = gripper_pos
print(gripper_pose)
# 移动机器人到计算出的位姿
return self.move_robot_to_pose(gripper_pose)

59
utils/control.py Normal file
View File

@@ -0,0 +1,59 @@
import numpy as np
from scipy.spatial.transform import Rotation as R
import time
class ControlUtil:
curr_rotation = 0
@staticmethod
def check_limit(new_cam_to_world):
if new_cam_to_world[0,3] < 0 or new_cam_to_world[1,3] > 0:
# if new_cam_to_world[0,3] > 0:
return False
x = abs(new_cam_to_world[0,3])
y = abs(new_cam_to_world[1,3])
tan_y_x = y/x
min_angle = 0 / 180 * np.pi
max_angle = 90 / 180 * np.pi
if tan_y_x < np.tan(min_angle) or tan_y_x > np.tan(max_angle):
return False
return True
@staticmethod
def solve_display_table_rot_and_cam_to_world(cam_to_world: np.ndarray) -> tuple:
if ControlUtil.check_limit(cam_to_world):
return 0, cam_to_world
else:
min_display_table_rot = 180
min_new_cam_to_world = None
for display_table_rot in np.linspace(0.1,360, 1800):
new_world_to_world = ControlUtil.get_z_axis_rot_mat(display_table_rot)
new_cam_to_new_world = cam_to_world
new_cam_to_world = new_world_to_world @ new_cam_to_new_world
if ControlUtil.check_limit(new_cam_to_world):
if display_table_rot < min_display_table_rot:
min_display_table_rot, min_new_cam_to_world = display_table_rot, new_cam_to_world
if abs(display_table_rot - 360) < min_display_table_rot:
min_display_table_rot, min_new_cam_to_world = display_table_rot - 360, new_cam_to_world
if min_new_cam_to_world is None:
raise ValueError("No valid display table rotation found")
delta_degree = min_display_table_rot - ControlUtil.curr_rotation
ControlUtil.curr_rotation = min_display_table_rot
return delta_degree, min_new_cam_to_world
@staticmethod
def get_z_axis_rot_mat(degree):
radian = np.radians(degree)
return np.array([
[np.cos(radian), -np.sin(radian), 0, 0],
[np.sin(radian), np.cos(radian), 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]
])

View File

@@ -70,7 +70,7 @@ class RenderUtil:
@staticmethod @staticmethod
def render_pts(cam_pose, scene_path, script_path, scan_points, voxel_threshold=0.005, filter_degree=75, nO_to_nL_pose=None, require_full_scene=False): def render_pts(cam_pose, scene_path, script_path, scan_points, voxel_threshold=0.005, filter_degree=75, nO_to_nL_pose=None, require_full_scene=False):
import ipdb; ipdb.set_trace()
nO_to_world_pose = DataLoadUtil.get_real_cam_O_from_cam_L(cam_pose, nO_to_nL_pose, scene_path=scene_path) nO_to_world_pose = DataLoadUtil.get_real_cam_O_from_cam_L(cam_pose, nO_to_nL_pose, scene_path=scene_path)