Re-sync with internal repository

This commit is contained in:
Nikhila Ravi
2020-03-18 10:35:27 -07:00
parent 2480723adf
commit 3d3b2fdc46
7 changed files with 2805 additions and 0 deletions

30
tests/bm_pointclouds.py Normal file
View File

@@ -0,0 +1,30 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
from itertools import product
from fvcore.common.benchmark import benchmark
from test_pointclouds import TestPointclouds
def bm_compute_packed_padded_pointclouds() -> None:
kwargs_list = []
num_clouds = [32, 128]
max_p = [100, 10000]
feats = [1, 10, 300]
test_cases = product(num_clouds, max_p, feats)
for case in test_cases:
n, p, f = case
kwargs_list.append({"num_clouds": n, "max_p": p, "features": f})
benchmark(
TestPointclouds.compute_packed_with_init,
"COMPUTE_PACKED",
kwargs_list,
warmup_iters=1,
)
benchmark(
TestPointclouds.compute_padded_with_init,
"COMPUTE_PADDED",
kwargs_list,
warmup_iters=1,
)

View File

@@ -0,0 +1,52 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import torch
from fvcore.common.benchmark import benchmark
from pytorch3d.renderer.points.rasterize_points import (
rasterize_points,
rasterize_points_python,
)
from pytorch3d.structures.pointclouds import Pointclouds
def _bm_python_with_init(N, P, img_size=32, radius=0.1, pts_per_pxl=3):
torch.manual_seed(231)
points = torch.randn(N, P, 3)
pointclouds = Pointclouds(points=points)
args = (pointclouds, img_size, radius, pts_per_pxl)
return lambda: rasterize_points_python(*args)
def _bm_cpu_with_init(N, P, img_size=32, radius=0.1, pts_per_pxl=3):
torch.manual_seed(231)
points = torch.randn(N, P, 3)
pointclouds = Pointclouds(points=points)
args = (pointclouds, img_size, radius, pts_per_pxl)
return lambda: rasterize_points(*args)
def _bm_cuda_with_init(N, P, img_size=32, radius=0.1, pts_per_pxl=3):
torch.manual_seed(231)
points = torch.randn(N, P, 3, device=torch.device("cuda"))
pointclouds = Pointclouds(points=points)
args = (pointclouds, img_size, radius, pts_per_pxl)
return lambda: rasterize_points(*args)
def bm_python_vs_cpu() -> None:
kwargs_list = [
{"N": 1, "P": 32, "img_size": 32, "radius": 0.1, "pts_per_pxl": 3},
{"N": 2, "P": 32, "img_size": 32, "radius": 0.1, "pts_per_pxl": 3},
]
benchmark(
_bm_python_with_init, "RASTERIZE_PYTHON", kwargs_list, warmup_iters=1
)
benchmark(_bm_cpu_with_init, "RASTERIZE_CPU", kwargs_list, warmup_iters=1)
kwargs_list = [
{"N": 2, "P": 32, "img_size": 32, "radius": 0.1, "pts_per_pxl": 3},
{"N": 4, "P": 1024, "img_size": 128, "radius": 0.05, "pts_per_pxl": 5},
]
benchmark(_bm_cpu_with_init, "RASTERIZE_CPU", kwargs_list, warmup_iters=1)
benchmark(_bm_cuda_with_init, "RASTERIZE_CUDA", kwargs_list, warmup_iters=1)

978
tests/test_pointclouds.py Normal file
View File

@@ -0,0 +1,978 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import numpy as np
import unittest
import torch
from pytorch3d.structures.pointclouds import Pointclouds
from common_testing import TestCaseMixin
class TestPointclouds(TestCaseMixin, unittest.TestCase):
def setUp(self) -> None:
np.random.seed(42)
torch.manual_seed(42)
@staticmethod
def init_cloud(
num_clouds: int = 3,
max_points: int = 100,
channels: int = 4,
lists_to_tensors: bool = False,
with_normals: bool = True,
with_features: bool = True,
):
"""
Function to generate a Pointclouds object of N meshes with
random number of points.
Args:
num_clouds: Number of clouds to generate.
channels: Number of features.
max_points: Max number of points per cloud.
lists_to_tensors: Determines whether the generated clouds should be
constructed from lists (=False) or
tensors (=True) of points/normals/features.
with_normals: bool whether to include normals
with_features: bool whether to include features
Returns:
Pointclouds object.
"""
device = torch.device("cuda:0")
p = torch.randint(max_points, size=(num_clouds,))
if lists_to_tensors:
p.fill_(p[0])
points_list = [
torch.rand((i, 3), device=device, dtype=torch.float32) for i in p
]
normals_list, features_list = None, None
if with_normals:
normals_list = [
torch.rand((i, 3), device=device, dtype=torch.float32)
for i in p
]
if with_features:
features_list = [
torch.rand((i, channels), device=device, dtype=torch.float32)
for i in p
]
if lists_to_tensors:
points_list = torch.stack(points_list)
if with_normals:
normals_list = torch.stack(normals_list)
if with_features:
features_list = torch.stack(features_list)
return Pointclouds(
points_list, normals=normals_list, features=features_list
)
def test_simple(self):
device = torch.device("cuda:0")
points = [
torch.tensor(
[[0.1, 0.3, 0.5], [0.5, 0.2, 0.1], [0.6, 0.8, 0.7]],
dtype=torch.float32,
device=device,
),
torch.tensor(
[
[0.1, 0.3, 0.3],
[0.6, 0.7, 0.8],
[0.2, 0.3, 0.4],
[0.1, 0.5, 0.3],
],
dtype=torch.float32,
device=device,
),
torch.tensor(
[
[0.7, 0.3, 0.6],
[0.2, 0.4, 0.8],
[0.9, 0.5, 0.2],
[0.2, 0.3, 0.4],
[0.9, 0.3, 0.8],
],
dtype=torch.float32,
device=device,
),
]
clouds = Pointclouds(points)
self.assertClose(
(clouds.packed_to_cloud_idx()).cpu(),
torch.tensor([0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2]),
)
self.assertClose(
clouds.cloud_to_packed_first_idx().cpu(), torch.tensor([0, 3, 7])
)
self.assertClose(
clouds.num_points_per_cloud().cpu(), torch.tensor([3, 4, 5])
)
self.assertClose(
clouds.padded_to_packed_idx().cpu(),
torch.tensor([0, 1, 2, 5, 6, 7, 8, 10, 11, 12, 13, 14]),
)
def test_all_constructions(self):
public_getters = [
"points_list",
"points_packed",
"packed_to_cloud_idx",
"cloud_to_packed_first_idx",
"num_points_per_cloud",
"points_padded",
"padded_to_packed_idx",
]
public_normals_getters = [
"normals_list",
"normals_packed",
"normals_padded",
]
public_features_getters = [
"features_list",
"features_packed",
"features_padded",
]
lengths = [3, 4, 2]
max_len = max(lengths)
C = 4
points_data = [torch.zeros((max_len, 3)).uniform_() for i in lengths]
normals_data = [torch.zeros((max_len, 3)).uniform_() for i in lengths]
features_data = [torch.zeros((max_len, C)).uniform_() for i in lengths]
for length, p, n, f in zip(
lengths, points_data, normals_data, features_data
):
p[length:] = 0.0
n[length:] = 0.0
f[length:] = 0.0
points_list = [d[:length] for length, d in zip(lengths, points_data)]
normals_list = [d[:length] for length, d in zip(lengths, normals_data)]
features_list = [
d[:length] for length, d in zip(lengths, features_data)
]
points_packed = torch.cat(points_data)
normals_packed = torch.cat(normals_data)
features_packed = torch.cat(features_data)
test_cases_inputs = [
("list_0_0", points_list, None, None),
("list_1_0", points_list, normals_list, None),
("list_0_1", points_list, None, features_list),
("list_1_1", points_list, normals_list, features_list),
("padded_0_0", points_data, None, None),
("padded_1_0", points_data, normals_data, None),
("padded_0_1", points_data, None, features_data),
("padded_1_1", points_data, normals_data, features_data),
("emptylist_emptylist_emptylist", [], [], []),
]
false_cases_inputs = [
(
"list_packed",
points_list,
normals_packed,
features_packed,
ValueError,
),
("packed_0", points_packed, None, None, ValueError),
]
for name, points, normals, features in test_cases_inputs:
with self.subTest(name=name):
p = Pointclouds(points, normals, features)
for method in public_getters:
self.assertIsNotNone(getattr(p, method)())
for method in public_normals_getters:
if normals is None or p.isempty():
self.assertIsNone(getattr(p, method)())
for method in public_features_getters:
if features is None or p.isempty():
self.assertIsNone(getattr(p, method)())
for name, points, normals, features, error in false_cases_inputs:
with self.subTest(name=name):
with self.assertRaises(error):
Pointclouds(points, normals, features)
def test_simple_random_clouds(self):
# Define the test object either from lists or tensors.
for with_normals in (False, True):
for with_features in (False, True):
for lists_to_tensors in (False, True):
N = 10
cloud = self.init_cloud(
N,
lists_to_tensors=lists_to_tensors,
with_normals=with_normals,
with_features=with_features,
)
points_list = cloud.points_list()
normals_list = cloud.normals_list()
features_list = cloud.features_list()
# Check batch calculations.
points_padded = cloud.points_padded()
normals_padded = cloud.normals_padded()
features_padded = cloud.features_padded()
points_per_cloud = cloud.num_points_per_cloud()
if not with_normals:
self.assertIsNone(normals_list)
self.assertIsNone(normals_padded)
if not with_features:
self.assertIsNone(features_list)
self.assertIsNone(features_padded)
for n in range(N):
p = points_list[n].shape[0]
self.assertClose(
points_padded[n, :p, :], points_list[n]
)
if with_normals:
norms = normals_list[n].shape[0]
self.assertEqual(p, norms)
self.assertClose(
normals_padded[n, :p, :], normals_list[n]
)
if with_features:
f = features_list[n].shape[0]
self.assertEqual(p, f)
self.assertClose(
features_padded[n, :p, :], features_list[n]
)
if points_padded.shape[1] > p:
self.assertTrue(points_padded[n, p:, :].eq(0).all())
if with_features:
self.assertTrue(
features_padded[n, p:, :].eq(0).all()
)
self.assertEqual(points_per_cloud[n], p)
# Check compute packed.
points_packed = cloud.points_packed()
packed_to_cloud = cloud.packed_to_cloud_idx()
cloud_to_packed = cloud.cloud_to_packed_first_idx()
normals_packed = cloud.normals_packed()
features_packed = cloud.features_packed()
if not with_normals:
self.assertIsNone(normals_packed)
if not with_features:
self.assertIsNone(features_packed)
cur = 0
for n in range(N):
p = points_list[n].shape[0]
self.assertClose(
points_packed[cur : cur + p, :], points_list[n]
)
if with_normals:
self.assertClose(
normals_packed[cur : cur + p, :],
normals_list[n],
)
if with_features:
self.assertClose(
features_packed[cur : cur + p, :],
features_list[n],
)
self.assertTrue(
packed_to_cloud[cur : cur + p].eq(n).all()
)
self.assertTrue(cloud_to_packed[n] == cur)
cur += p
def test_allempty(self):
clouds = Pointclouds([], [])
self.assertEqual(len(clouds), 0)
self.assertIsNone(clouds.normals_list())
self.assertIsNone(clouds.features_list())
self.assertEqual(clouds.points_padded().shape[0], 0)
self.assertIsNone(clouds.normals_padded())
self.assertIsNone(clouds.features_padded())
self.assertEqual(clouds.points_packed().shape[0], 0)
self.assertIsNone(clouds.normals_packed())
self.assertIsNone(clouds.features_packed())
def test_empty(self):
N, P, C = 10, 100, 2
device = torch.device("cuda:0")
points_list = []
normals_list = []
features_list = []
valid = torch.randint(2, size=(N,), dtype=torch.uint8, device=device)
for n in range(N):
if valid[n]:
p = torch.randint(
3, high=P, size=(1,), dtype=torch.int32, device=device
)[0]
points = torch.rand((p, 3), dtype=torch.float32, device=device)
normals = torch.rand((p, 3), dtype=torch.float32, device=device)
features = torch.rand(
(p, C), dtype=torch.float32, device=device
)
else:
points = torch.tensor([], dtype=torch.float32, device=device)
normals = torch.tensor([], dtype=torch.float32, device=device)
features = torch.tensor([], dtype=torch.int64, device=device)
points_list.append(points)
normals_list.append(normals)
features_list.append(features)
for with_normals in (False, True):
for with_features in (False, True):
this_features, this_normals = None, None
if with_normals:
this_normals = normals_list
if with_features:
this_features = features_list
clouds = Pointclouds(
points=points_list,
normals=this_normals,
features=this_features,
)
points_padded = clouds.points_padded()
normals_padded = clouds.normals_padded()
features_padded = clouds.features_padded()
if not with_normals:
self.assertIsNone(normals_padded)
if not with_features:
self.assertIsNone(features_padded)
points_per_cloud = clouds.num_points_per_cloud()
for n in range(N):
p = len(points_list[n])
if p > 0:
self.assertClose(
points_padded[n, :p, :], points_list[n]
)
if with_normals:
self.assertClose(
normals_padded[n, :p, :], normals_list[n]
)
if with_features:
self.assertClose(
features_padded[n, :p, :], features_list[n]
)
if points_padded.shape[1] > p:
self.assertTrue(points_padded[n, p:, :].eq(0).all())
if with_normals:
self.assertTrue(
normals_padded[n, p:, :].eq(0).all()
)
if with_features:
self.assertTrue(
features_padded[n, p:, :].eq(0).all()
)
self.assertTrue(points_per_cloud[n] == p)
def test_clone_list(self):
N = 5
clouds = self.init_cloud(N, 100, 5)
for force in (False, True):
if force:
clouds.points_packed()
new_clouds = clouds.clone()
# Check cloned and original objects do not share tensors.
self.assertSeparate(
new_clouds.points_list()[0], clouds.points_list()[0]
)
self.assertSeparate(
new_clouds.normals_list()[0], clouds.normals_list()[0]
)
self.assertSeparate(
new_clouds.features_list()[0], clouds.features_list()[0]
)
for attrib in [
"points_packed",
"normals_packed",
"features_packed",
"points_padded",
"normals_padded",
"features_padded",
]:
self.assertSeparate(
getattr(new_clouds, attrib)(), getattr(clouds, attrib)()
)
self.assertCloudsEqual(clouds, new_clouds)
def test_clone_tensor(self):
N = 5
clouds = self.init_cloud(N, 100, 5, lists_to_tensors=True)
for force in (False, True):
if force:
clouds.points_packed()
new_clouds = clouds.clone()
# Check cloned and original objects do not share tensors.
self.assertSeparate(
new_clouds.points_list()[0], clouds.points_list()[0]
)
self.assertSeparate(
new_clouds.normals_list()[0], clouds.normals_list()[0]
)
self.assertSeparate(
new_clouds.features_list()[0], clouds.features_list()[0]
)
for attrib in [
"points_packed",
"normals_packed",
"features_packed",
"points_padded",
"normals_padded",
"features_padded",
]:
self.assertSeparate(
getattr(new_clouds, attrib)(), getattr(clouds, attrib)()
)
self.assertCloudsEqual(clouds, new_clouds)
def assertCloudsEqual(self, cloud1, cloud2):
N = len(cloud1)
self.assertEqual(N, len(cloud2))
for i in range(N):
self.assertClose(cloud1.points_list()[i], cloud2.points_list()[i])
self.assertClose(cloud1.normals_list()[i], cloud2.normals_list()[i])
self.assertClose(
cloud1.features_list()[i], cloud2.features_list()[i]
)
has_normals = cloud1.normals_list() is not None
self.assertTrue(has_normals == (cloud2.normals_list() is not None))
has_features = cloud1.features_list() is not None
self.assertTrue(has_features == (cloud2.features_list() is not None))
# check padded & packed
self.assertClose(cloud1.points_padded(), cloud2.points_padded())
self.assertClose(cloud1.points_packed(), cloud2.points_packed())
if has_normals:
self.assertClose(cloud1.normals_padded(), cloud2.normals_padded())
self.assertClose(cloud1.normals_packed(), cloud2.normals_packed())
if has_features:
self.assertClose(cloud1.features_padded(), cloud2.features_padded())
self.assertClose(cloud1.features_packed(), cloud2.features_packed())
self.assertClose(
cloud1.packed_to_cloud_idx(), cloud2.packed_to_cloud_idx()
)
self.assertClose(
cloud1.cloud_to_packed_first_idx(),
cloud2.cloud_to_packed_first_idx(),
)
self.assertClose(
cloud1.num_points_per_cloud(), cloud2.num_points_per_cloud()
)
self.assertClose(
cloud1.packed_to_cloud_idx(), cloud2.packed_to_cloud_idx()
)
self.assertClose(
cloud1.padded_to_packed_idx(), cloud2.padded_to_packed_idx()
)
self.assertTrue(all(cloud1.valid == cloud2.valid))
self.assertTrue(cloud1.equisized == cloud2.equisized)
def test_offset(self):
def naive_offset(clouds, offsets_packed):
new_points_packed = clouds.points_packed() + offsets_packed
new_points_list = list(
new_points_packed.split(
clouds.num_points_per_cloud().tolist(), 0
)
)
return Pointclouds(
points=new_points_list,
normals=clouds.normals_list(),
features=clouds.features_list(),
)
N = 5
clouds = self.init_cloud(N, 100, 10)
all_p = clouds.points_packed().size(0)
points_per_cloud = clouds.num_points_per_cloud()
for force in (False, True):
if force:
clouds._compute_packed(refresh=True)
clouds._compute_padded()
clouds.padded_to_packed_idx()
deform = torch.rand(
(all_p, 3), dtype=torch.float32, device=clouds.device
)
new_clouds_naive = naive_offset(clouds, deform)
new_clouds = clouds.offset(deform)
points_cumsum = torch.cumsum(points_per_cloud, 0).tolist()
points_cumsum.insert(0, 0)
for i in range(N):
self.assertClose(
new_clouds.points_list()[i],
clouds.points_list()[i]
+ deform[points_cumsum[i] : points_cumsum[i + 1]],
)
self.assertClose(
clouds.normals_list()[i], new_clouds_naive.normals_list()[i]
)
self.assertClose(
clouds.features_list()[i],
new_clouds_naive.features_list()[i],
)
self.assertCloudsEqual(new_clouds, new_clouds_naive)
def test_scale(self):
def naive_scale(cloud, scale):
if not torch.is_tensor(scale):
scale = torch.full(len(cloud), scale)
new_points_list = [
scale[i] * points.clone()
for (i, points) in enumerate(cloud.points_list())
]
return Pointclouds(
new_points_list, cloud.normals_list(), cloud.features_list()
)
N = 5
clouds = self.init_cloud(N, 100, 10)
for force in (False, True):
if force:
clouds._compute_packed(refresh=True)
clouds._compute_padded()
clouds.padded_to_packed_idx()
scales = torch.rand(N)
new_clouds_naive = naive_scale(clouds, scales)
new_clouds = clouds.scale(scales)
for i in range(N):
self.assertClose(
scales[i] * clouds.points_list()[i],
new_clouds.points_list()[i],
)
self.assertClose(
clouds.normals_list()[i], new_clouds_naive.normals_list()[i]
)
self.assertClose(
clouds.features_list()[i],
new_clouds_naive.features_list()[i],
)
self.assertCloudsEqual(new_clouds, new_clouds_naive)
def test_extend_list(self):
N = 10
clouds = self.init_cloud(N, 100, 10)
for force in (False, True):
if force:
# force some computes to happen
clouds._compute_packed(refresh=True)
clouds._compute_padded()
clouds.padded_to_packed_idx()
new_clouds = clouds.extend(N)
self.assertEqual(len(clouds) * 10, len(new_clouds))
for i in range(len(clouds)):
for n in range(N):
self.assertClose(
clouds.points_list()[i],
new_clouds.points_list()[i * N + n],
)
self.assertClose(
clouds.normals_list()[i],
new_clouds.normals_list()[i * N + n],
)
self.assertClose(
clouds.features_list()[i],
new_clouds.features_list()[i * N + n],
)
self.assertTrue(
clouds.valid[i] == new_clouds.valid[i * N + n]
)
self.assertAllSeparate(
clouds.points_list()
+ new_clouds.points_list()
+ clouds.normals_list()
+ new_clouds.normals_list()
+ clouds.features_list()
+ new_clouds.features_list()
)
self.assertIsNone(new_clouds._points_packed)
self.assertIsNone(new_clouds._normals_packed)
self.assertIsNone(new_clouds._features_packed)
self.assertIsNone(new_clouds._points_padded)
self.assertIsNone(new_clouds._normals_padded)
self.assertIsNone(new_clouds._features_padded)
with self.assertRaises(ValueError):
clouds.extend(N=-1)
def test_to_list(self):
cloud = self.init_cloud(5, 100, 10)
device = torch.device("cuda:1")
new_cloud = cloud.to(device)
self.assertTrue(new_cloud.device == device)
self.assertTrue(cloud.device == torch.device("cuda:0"))
for attrib in [
"points_padded",
"points_packed",
"normals_padded",
"normals_packed",
"features_padded",
"features_packed",
"num_points_per_cloud",
"cloud_to_packed_first_idx",
"padded_to_packed_idx",
]:
self.assertClose(
getattr(new_cloud, attrib)().cpu(),
getattr(cloud, attrib)().cpu(),
)
for i in range(len(cloud)):
self.assertClose(
cloud.points_list()[i].cpu(), new_cloud.points_list()[i].cpu()
)
self.assertClose(
cloud.normals_list()[i].cpu(), new_cloud.normals_list()[i].cpu()
)
self.assertClose(
cloud.features_list()[i].cpu(),
new_cloud.features_list()[i].cpu(),
)
self.assertTrue(all(cloud.valid.cpu() == new_cloud.valid.cpu()))
self.assertTrue(cloud.equisized == new_cloud.equisized)
self.assertTrue(cloud._N == new_cloud._N)
self.assertTrue(cloud._P == new_cloud._P)
self.assertTrue(cloud._C == new_cloud._C)
def test_to_tensor(self):
cloud = self.init_cloud(5, 100, 10, lists_to_tensors=True)
device = torch.device("cuda:1")
new_cloud = cloud.to(device)
self.assertTrue(new_cloud.device == device)
self.assertTrue(cloud.device == torch.device("cuda:0"))
for attrib in [
"points_padded",
"points_packed",
"normals_padded",
"normals_packed",
"features_padded",
"features_packed",
"num_points_per_cloud",
"cloud_to_packed_first_idx",
"padded_to_packed_idx",
]:
self.assertClose(
getattr(new_cloud, attrib)().cpu(),
getattr(cloud, attrib)().cpu(),
)
for i in range(len(cloud)):
self.assertClose(
cloud.points_list()[i].cpu(), new_cloud.points_list()[i].cpu()
)
self.assertClose(
cloud.normals_list()[i].cpu(), new_cloud.normals_list()[i].cpu()
)
self.assertClose(
cloud.features_list()[i].cpu(),
new_cloud.features_list()[i].cpu(),
)
self.assertTrue(all(cloud.valid.cpu() == new_cloud.valid.cpu()))
self.assertTrue(cloud.equisized == new_cloud.equisized)
self.assertTrue(cloud._N == new_cloud._N)
self.assertTrue(cloud._P == new_cloud._P)
self.assertTrue(cloud._C == new_cloud._C)
def test_split(self):
clouds = self.init_cloud(5, 100, 10)
split_sizes = [2, 3]
split_clouds = clouds.split(split_sizes)
self.assertEqual(len(split_clouds[0]), 2)
self.assertTrue(
split_clouds[0].points_list()
== [clouds.get_cloud(0)[0], clouds.get_cloud(1)[0]]
)
self.assertEqual(len(split_clouds[1]), 3)
self.assertTrue(
split_clouds[1].points_list()
== [
clouds.get_cloud(2)[0],
clouds.get_cloud(3)[0],
clouds.get_cloud(4)[0],
]
)
split_sizes = [2, 0.3]
with self.assertRaises(ValueError):
clouds.split(split_sizes)
def test_get_cloud(self):
clouds = self.init_cloud(2, 100, 10)
for i in range(len(clouds)):
points, normals, features = clouds.get_cloud(i)
self.assertClose(points, clouds.points_list()[i])
self.assertClose(normals, clouds.normals_list()[i])
self.assertClose(features, clouds.features_list()[i])
with self.assertRaises(ValueError):
clouds.get_cloud(5)
with self.assertRaises(ValueError):
clouds.get_cloud(0.2)
def test_get_bounding_boxes(self):
device = torch.device("cuda:0")
points_list = []
for size in [10]:
points = torch.rand((size, 3), dtype=torch.float32, device=device)
points_list.append(points)
mins = torch.min(points, dim=0)[0]
maxs = torch.max(points, dim=0)[0]
bboxes_gt = torch.stack([mins, maxs], dim=1).unsqueeze(0)
clouds = Pointclouds(points_list)
bboxes = clouds.get_bounding_boxes()
self.assertClose(bboxes_gt, bboxes)
def test_padded_to_packed_idx(self):
device = torch.device("cuda:0")
points_list = []
npoints = [10, 20, 30]
for p in npoints:
points = torch.rand((p, 3), dtype=torch.float32, device=device)
points_list.append(points)
clouds = Pointclouds(points_list)
padded_to_packed_idx = clouds.padded_to_packed_idx()
points_packed = clouds.points_packed()
points_padded = clouds.points_padded()
points_padded_flat = points_padded.view(-1, 3)
self.assertClose(
points_padded_flat[padded_to_packed_idx], points_packed
)
idx = padded_to_packed_idx.view(-1, 1).expand(-1, 3)
self.assertClose(points_padded_flat.gather(0, idx), points_packed)
def test_getitem(self):
device = torch.device("cuda:0")
clouds = self.init_cloud(3, 10, 100)
def check_equal(selected, indices):
for selectedIdx, index in indices:
self.assertClose(
selected.points_list()[selectedIdx],
clouds.points_list()[index],
)
self.assertClose(
selected.normals_list()[selectedIdx],
clouds.normals_list()[index],
)
self.assertClose(
selected.features_list()[selectedIdx],
clouds.features_list()[index],
)
# int index
index = 1
clouds_selected = clouds[index]
self.assertEqual(len(clouds_selected), 1)
check_equal(clouds_selected, [(0, 1)])
# list index
index = [1, 2]
clouds_selected = clouds[index]
self.assertEqual(len(clouds_selected), len(index))
check_equal(clouds_selected, enumerate(index))
# slice index
index = slice(0, 2, 1)
clouds_selected = clouds[index]
self.assertEqual(len(clouds_selected), 2)
check_equal(clouds_selected, [(0, 0), (1, 1)])
# bool tensor
index = torch.tensor([1, 0, 1], dtype=torch.bool, device=device)
clouds_selected = clouds[index]
self.assertEqual(len(clouds_selected), index.sum())
check_equal(clouds_selected, [(0, 0), (1, 2)])
# int tensor
index = torch.tensor([1, 2], dtype=torch.int64, device=device)
clouds_selected = clouds[index]
self.assertEqual(len(clouds_selected), index.numel())
check_equal(clouds_selected, enumerate(index.tolist()))
# invalid index
index = torch.tensor([1, 0, 1], dtype=torch.float32, device=device)
with self.assertRaises(IndexError):
clouds_selected = clouds[index]
index = 1.2
with self.assertRaises(IndexError):
clouds_selected = clouds[index]
def test_update_padded(self):
N, P, C = 5, 100, 4
for with_normfeat in (True, False):
for with_new_normfeat in (True, False):
clouds = self.init_cloud(
N,
P,
C,
with_normals=with_normfeat,
with_features=with_normfeat,
)
num_points_per_cloud = clouds.num_points_per_cloud()
# initialize new points, normals, features
new_points = torch.rand(
clouds.points_padded().shape, device=clouds.device
)
new_points_list = [
new_points[i, : num_points_per_cloud[i]] for i in range(N)
]
new_normals, new_normals_list = None, None
new_features, new_features_list = None, None
if with_new_normfeat:
new_normals = torch.rand(
clouds.points_padded().shape, device=clouds.device
)
new_normals_list = [
new_normals[i, : num_points_per_cloud[i]]
for i in range(N)
]
feat_shape = [
clouds.points_padded().shape[0],
clouds.points_padded().shape[1],
C,
]
new_features = torch.rand(feat_shape, device=clouds.device)
new_features_list = [
new_features[i, : num_points_per_cloud[i]]
for i in range(N)
]
# update
new_clouds = clouds.update_padded(
new_points, new_normals, new_features
)
self.assertIsNone(new_clouds._points_list)
self.assertIsNone(new_clouds._points_packed)
self.assertEqual(new_clouds.equisized, clouds.equisized)
self.assertTrue(all(new_clouds.valid == clouds.valid))
self.assertClose(new_clouds.points_padded(), new_points)
self.assertClose(
new_clouds.points_packed(), torch.cat(new_points_list)
)
for i in range(N):
self.assertClose(
new_clouds.points_list()[i], new_points_list[i]
)
if with_new_normfeat:
for i in range(N):
self.assertClose(
new_clouds.normals_list()[i], new_normals_list[i]
)
self.assertClose(
new_clouds.features_list()[i], new_features_list[i]
)
self.assertClose(new_clouds.normals_padded(), new_normals)
self.assertClose(
new_clouds.normals_packed(), torch.cat(new_normals_list)
)
self.assertClose(new_clouds.features_padded(), new_features)
self.assertClose(
new_clouds.features_packed(),
torch.cat(new_features_list),
)
else:
if with_normfeat:
for i in range(N):
self.assertClose(
new_clouds.normals_list()[i],
clouds.normals_list()[i],
)
self.assertClose(
new_clouds.features_list()[i],
clouds.features_list()[i],
)
self.assertNotSeparate(
new_clouds.normals_list()[i],
clouds.normals_list()[i],
)
self.assertNotSeparate(
new_clouds.features_list()[i],
clouds.features_list()[i],
)
self.assertClose(
new_clouds.normals_padded(), clouds.normals_padded()
)
self.assertClose(
new_clouds.normals_packed(), clouds.normals_packed()
)
self.assertClose(
new_clouds.features_padded(),
clouds.features_padded(),
)
self.assertClose(
new_clouds.features_packed(),
clouds.features_packed(),
)
self.assertNotSeparate(
new_clouds.normals_padded(), clouds.normals_padded()
)
self.assertNotSeparate(
new_clouds.features_padded(),
clouds.features_padded(),
)
else:
self.assertIsNone(new_clouds.normals_list())
self.assertIsNone(new_clouds.features_list())
self.assertIsNone(new_clouds.normals_padded())
self.assertIsNone(new_clouds.features_padded())
self.assertIsNone(new_clouds.normals_packed())
self.assertIsNone(new_clouds.features_packed())
for attrib in [
"num_points_per_cloud",
"cloud_to_packed_first_idx",
"padded_to_packed_idx",
]:
self.assertClose(
getattr(new_clouds, attrib)(), getattr(clouds, attrib)()
)
@staticmethod
def compute_packed_with_init(
num_clouds: int = 10, max_p: int = 100, features: int = 300
):
clouds = TestPointclouds.init_cloud(num_clouds, max_p, features)
torch.cuda.synchronize()
def compute_packed():
clouds._compute_packed(refresh=True)
torch.cuda.synchronize()
return compute_packed
@staticmethod
def compute_padded_with_init(
num_clouds: int = 10, max_p: int = 100, features: int = 300
):
clouds = TestPointclouds.init_cloud(num_clouds, max_p, features)
torch.cuda.synchronize()
def compute_padded():
clouds._compute_padded(refresh=True)
torch.cuda.synchronize()
return compute_padded

View File

@@ -0,0 +1,525 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
import numpy as np
import unittest
import torch
from pytorch3d import _C
from pytorch3d.renderer.points.rasterize_points import (
rasterize_points,
rasterize_points_python,
)
from pytorch3d.structures.pointclouds import Pointclouds
from common_testing import TestCaseMixin
class TestRasterizePoints(TestCaseMixin, unittest.TestCase):
def test_python_simple_cpu(self):
self._simple_test_case(
rasterize_points_python, torch.device("cpu"), bin_size=-1
)
def test_naive_simple_cpu(self):
device = torch.device("cpu")
self._simple_test_case(rasterize_points, device)
def test_naive_simple_cuda(self):
device = torch.device("cuda")
self._simple_test_case(rasterize_points, device, bin_size=0)
def test_python_behind_camera(self):
self._test_behind_camera(
rasterize_points_python, torch.device("cpu"), bin_size=-1
)
def test_cpu_behind_camera(self):
self._test_behind_camera(rasterize_points, torch.device("cpu"))
def test_cuda_behind_camera(self):
self._test_behind_camera(
rasterize_points, torch.device("cuda"), bin_size=0
)
def test_cpp_vs_naive_vs_binned(self):
# Make sure that the backward pass runs for all pathways
N = 2
P = 1000
image_size = 32
radius = 0.1
points_per_pixel = 3
points1 = torch.randn(P, 3, requires_grad=True)
points2 = torch.randn(int(P / 2), 3, requires_grad=True)
pointclouds = Pointclouds(points=[points1, points2])
grad_zbuf = torch.randn(N, image_size, image_size, points_per_pixel)
grad_dists = torch.randn(N, image_size, image_size, points_per_pixel)
# Option I: CPU, naive
idx1, zbuf1, dists1 = rasterize_points(
pointclouds, image_size, radius, points_per_pixel, bin_size=0
)
loss = (zbuf1 * grad_zbuf).sum() + (dists1 * grad_dists).sum()
loss.backward()
grad1 = points1.grad.data.clone()
# Option II: CUDA, naive
points1_cuda = points1.cuda().detach().clone().requires_grad_(True)
points2_cuda = points2.cuda().detach().clone().requires_grad_(True)
pointclouds = Pointclouds(points=[points1_cuda, points2_cuda])
grad_zbuf = grad_zbuf.cuda()
grad_dists = grad_dists.cuda()
idx2, zbuf2, dists2 = rasterize_points(
pointclouds, image_size, radius, points_per_pixel, bin_size=0
)
loss = (zbuf2 * grad_zbuf).sum() + (dists2 * grad_dists).sum()
loss.backward()
idx2 = idx2.data.cpu().clone()
zbuf2 = zbuf2.data.cpu().clone()
dists2 = dists2.data.cpu().clone()
grad2 = points1_cuda.grad.data.cpu().clone()
# Option III: CUDA, binned
points1_cuda = points1.cuda().detach().clone().requires_grad_(True)
points2_cuda = points2.cuda().detach().clone().requires_grad_(True)
pointclouds = Pointclouds(points=[points1_cuda, points2_cuda])
idx3, zbuf3, dists3 = rasterize_points(
pointclouds, image_size, radius, points_per_pixel, bin_size=32
)
loss = (zbuf3 * grad_zbuf).sum() + (dists3 * grad_dists).sum()
points1.grad.data.zero_()
loss.backward()
idx3 = idx3.data.cpu().clone()
zbuf3 = zbuf3.data.cpu().clone()
dists3 = dists3.data.cpu().clone()
grad3 = points1_cuda.grad.data.cpu().clone()
# Make sure everything was the same
idx12_same = (idx1 == idx2).all().item()
idx13_same = (idx1 == idx3).all().item()
zbuf12_same = (zbuf1 == zbuf2).all().item()
zbuf13_same = (zbuf1 == zbuf3).all().item()
dists12_diff = (dists1 - dists2).abs().max().item()
dists13_diff = (dists1 - dists3).abs().max().item()
self.assertTrue(idx12_same)
self.assertTrue(idx13_same)
self.assertTrue(zbuf12_same)
self.assertTrue(zbuf13_same)
self.assertTrue(dists12_diff < 1e-6)
self.assertTrue(dists13_diff < 1e-6)
diff12 = (grad1 - grad2).abs().max().item()
diff13 = (grad1 - grad3).abs().max().item()
diff23 = (grad2 - grad3).abs().max().item()
self.assertTrue(diff12 < 5e-6)
self.assertTrue(diff13 < 5e-6)
self.assertTrue(diff23 < 5e-6)
def test_python_vs_cpu_naive(self):
torch.manual_seed(231)
image_size = 32
radius = 0.1
points_per_pixel = 3
# Test a batch of homogeneous point clouds.
N = 2
P = 17
points = torch.randn(N, P, 3, requires_grad=True)
pointclouds = Pointclouds(points=points)
args = (pointclouds, image_size, radius, points_per_pixel)
self._compare_impls(
rasterize_points_python,
rasterize_points,
args,
args,
points,
points,
compare_grads=True,
)
# Test a batch of heterogeneous point clouds.
P2 = 10
points1 = torch.randn(P, 3, requires_grad=True)
points2 = torch.randn(P2, 3)
pointclouds = Pointclouds(points=[points1, points2])
args = (pointclouds, image_size, radius, points_per_pixel)
self._compare_impls(
rasterize_points_python,
rasterize_points,
args,
args,
points1, # check gradients for first element in batch
points1,
compare_grads=True,
)
def test_cpu_vs_cuda_naive(self):
torch.manual_seed(231)
image_size = 64
radius = 0.1
points_per_pixel = 5
# Test homogeneous point cloud batch.
N = 2
P = 1000
bin_size = 0
points_cpu = torch.rand(N, P, 3, requires_grad=True)
points_cuda = points_cpu.cuda().detach().requires_grad_(True)
pointclouds_cpu = Pointclouds(points=points_cpu)
pointclouds_cuda = Pointclouds(points=points_cuda)
args_cpu = (
pointclouds_cpu,
image_size,
radius,
points_per_pixel,
bin_size,
)
args_cuda = (
pointclouds_cuda,
image_size,
radius,
points_per_pixel,
bin_size,
)
self._compare_impls(
rasterize_points,
rasterize_points,
args_cpu,
args_cuda,
points_cpu,
points_cuda,
compare_grads=True,
)
def _compare_impls(
self,
fn1,
fn2,
args1,
args2,
grad_var1=None,
grad_var2=None,
compare_grads=False,
):
idx1, zbuf1, dist1 = fn1(*args1)
torch.manual_seed(231)
grad_zbuf = torch.randn_like(zbuf1)
grad_dist = torch.randn_like(dist1)
loss = (zbuf1 * grad_zbuf).sum() + (dist1 * grad_dist).sum()
if compare_grads:
loss.backward()
grad_points1 = grad_var1.grad.data.clone().cpu()
idx2, zbuf2, dist2 = fn2(*args2)
grad_zbuf = grad_zbuf.to(zbuf2)
grad_dist = grad_dist.to(dist2)
loss = (zbuf2 * grad_zbuf).sum() + (dist2 * grad_dist).sum()
if compare_grads:
# clear points1.grad in case args1 and args2 reused the same tensor
grad_var1.grad.data.zero_()
loss.backward()
grad_points2 = grad_var2.grad.data.clone().cpu()
self.assertEqual((idx1.cpu() == idx2.cpu()).all().item(), 1)
self.assertEqual((zbuf1.cpu() == zbuf2.cpu()).all().item(), 1)
self.assertClose(dist1.cpu(), dist2.cpu())
if compare_grads:
self.assertTrue(
torch.allclose(grad_points1, grad_points2, atol=2e-6)
)
def _test_behind_camera(self, rasterize_points_fn, device, bin_size=None):
# Test case where all points are behind the camera -- nothing should
# get rasterized
N = 2
P = 32
xy = torch.randn(N, P, 2)
z = torch.randn(N, P, 1).abs().mul(-1) # Make them all negative
points = torch.cat([xy, z], dim=2).to(device)
image_size = 16
points_per_pixel = 3
radius = 0.2
idx_expected = torch.full(
(N, 16, 16, 3), fill_value=-1, dtype=torch.int32, device=device
)
zbuf_expected = torch.full(
(N, 16, 16, 3), fill_value=-1, dtype=torch.float32, device=device
)
dists_expected = zbuf_expected.clone()
pointclouds = Pointclouds(points=points)
if bin_size == -1:
# simple python case with no binning
idx, zbuf, dists = rasterize_points_fn(
pointclouds, image_size, radius, points_per_pixel
)
else:
idx, zbuf, dists = rasterize_points_fn(
pointclouds, image_size, radius, points_per_pixel, bin_size
)
idx_same = (idx == idx_expected).all().item() == 1
zbuf_same = (zbuf == zbuf_expected).all().item() == 1
self.assertTrue(idx_same)
self.assertTrue(zbuf_same)
self.assertTrue(torch.allclose(dists, dists_expected))
def _simple_test_case(self, rasterize_points_fn, device, bin_size=0):
# Create two pointclouds with different numbers of points.
# fmt: off
points1 = torch.tensor(
[
[0.0, 0.0, 0.0], # noqa: E241
[0.4, 0.0, 0.1], # noqa: E241
[0.0, 0.4, 0.2], # noqa: E241
[0.0, 0.0, -0.1], # noqa: E241 Points with negative z should be skippped
],
device=device,
)
points2 = torch.tensor(
[
[0.0, 0.0, 0.0], # noqa: E241
[0.4, 0.0, 0.1], # noqa: E241
[0.0, 0.4, 0.2], # noqa: E241
[0.0, 0.0, -0.1], # noqa: E241 Points with negative z should be skippped
[0.0, 0.0, -0.7], # noqa: E241 Points with negative z should be skippped
],
device=device,
)
# fmt: on
pointclouds = Pointclouds(points=[points1, points2])
image_size = 5
points_per_pixel = 2
radius = 0.5
# The expected output values. Note that in the outputs, the world space
# +Y is up, and the world space +X is left.
idx1_expected = torch.full(
(1, 5, 5, 2), fill_value=-1, dtype=torch.int32, device=device
)
# fmt: off
idx1_expected[0, :, :, 0] = torch.tensor([
[-1, -1, 2, -1, -1], # noqa: E241
[-1, 1, 0, 2, -1], # noqa: E241
[ 1, 0, 0, 0, -1], # noqa: E241 E201
[-1, 1, 0, -1, -1], # noqa: E241
[-1, -1, -1, -1, -1], # noqa: E241
], device=device)
idx1_expected[0, :, :, 1] = torch.tensor([
[-1, -1, -1, -1, -1], # noqa: E241
[-1, 2, 2, -1, -1], # noqa: E241
[-1, 1, 1, -1, -1], # noqa: E241
[-1, -1, -1, -1, -1], # noqa: E241
[-1, -1, -1, -1, -1], # noqa: E241
], device=device)
# fmt: on
zbuf1_expected = torch.full(
(1, 5, 5, 2), fill_value=100, dtype=torch.float32, device=device
)
# fmt: off
zbuf1_expected[0, :, :, 0] = torch.tensor([
[-1.0, -1.0, 0.2, -1.0, -1.0], # noqa: E241
[-1.0, 0.1, 0.0, 0.2, -1.0], # noqa: E241
[ 0.1, 0.0, 0.0, 0.0, -1.0], # noqa: E241 E201
[-1.0, 0.1, 0.0, -1.0, -1.0], # noqa: E241
[-1.0, -1.0, -1.0, -1.0, -1.0] # noqa: E241
], device=device)
zbuf1_expected[0, :, :, 1] = torch.tensor([
[-1.0, -1.0, -1.0, -1.0, -1.0], # noqa: E241
[-1.0, 0.2, 0.2, -1.0, -1.0], # noqa: E241
[-1.0, 0.1, 0.1, -1.0, -1.0], # noqa: E241
[-1.0, -1.0, -1.0, -1.0, -1.0], # noqa: E241
[-1.0, -1.0, -1.0, -1.0, -1.0], # noqa: E241
], device=device)
# fmt: on
dists1_expected = torch.full(
(1, 5, 5, 2), fill_value=0.0, dtype=torch.float32, device=device
)
# fmt: off
dists1_expected[0, :, :, 0] = torch.tensor([
[-1.00, -1.00, 0.16, -1.00, -1.00], # noqa: E241
[-1.00, 0.16, 0.16, 0.16, -1.00], # noqa: E241
[ 0.16, 0.16, 0.00, 0.16, -1.00], # noqa: E241 E201
[-1.00, 0.16, 0.16, -1.00, -1.00], # noqa: E241
[-1.00, -1.00, -1.00, -1.00, -1.00], # noqa: E241
], device=device)
dists1_expected[0, :, :, 1] = torch.tensor([
[-1.00, -1.00, -1.00, -1.00, -1.00], # noqa: E241
[-1.00, 0.16, 0.00, -1.00, -1.00], # noqa: E241
[-1.00, 0.00, 0.16, -1.00, -1.00], # noqa: E241
[-1.00, -1.00, -1.00, -1.00, -1.00], # noqa: E241
[-1.00, -1.00, -1.00, -1.00, -1.00], # noqa: E241
], device=device)
# fmt: on
if bin_size == -1:
# simple python case with no binning
idx, zbuf, dists = rasterize_points_fn(
pointclouds, image_size, radius, points_per_pixel
)
else:
idx, zbuf, dists = rasterize_points_fn(
pointclouds, image_size, radius, points_per_pixel, bin_size
)
# check first point cloud
idx_same = (idx[0, ...] == idx1_expected).all().item() == 1
if idx_same == 0:
print(idx[0, :, :, 0])
print(idx[0, :, :, 1])
zbuf_same = (zbuf[0, ...] == zbuf1_expected).all().item() == 1
dist_same = torch.allclose(dists[0, ...], dists1_expected)
self.assertTrue(idx_same)
self.assertTrue(zbuf_same)
self.assertTrue(dist_same)
# Check second point cloud - the indices in idx refer to points in the
# pointclouds.points_packed() tensor. In the second point cloud,
# two points are behind the screen - the expected indices are the same
# the first pointcloud but offset by the number of points in the
# first pointcloud.
num_points_per_cloud = pointclouds.num_points_per_cloud()
idx1_expected[idx1_expected >= 0] += num_points_per_cloud[0]
idx_same = (idx[1, ...] == idx1_expected).all().item() == 1
zbuf_same = (zbuf[1, ...] == zbuf1_expected).all().item() == 1
self.assertTrue(idx_same)
self.assertTrue(zbuf_same)
self.assertTrue(torch.allclose(dists[1, ...], dists1_expected))
def test_coarse_cpu(self):
return self._test_coarse_rasterize(torch.device("cpu"))
def test_coarse_cuda(self):
return self._test_coarse_rasterize(torch.device("cuda"))
def test_compare_coarse_cpu_vs_cuda(self):
torch.manual_seed(231)
N = 3
max_P = 1000
image_size = 64
radius = 0.1
bin_size = 16
max_points_per_bin = 500
# create heterogeneous point clouds
points = []
for _ in range(N):
p = np.random.choice(max_P)
points.append(torch.randn(p, 3))
pointclouds = Pointclouds(points=points)
points_packed = pointclouds.points_packed()
cloud_to_packed_first_idx = pointclouds.cloud_to_packed_first_idx()
num_points_per_cloud = pointclouds.num_points_per_cloud()
args = (
points_packed,
cloud_to_packed_first_idx,
num_points_per_cloud,
image_size,
radius,
bin_size,
max_points_per_bin,
)
bp_cpu = _C._rasterize_points_coarse(*args)
pointclouds_cuda = pointclouds.to("cuda:0")
points_packed = pointclouds_cuda.points_packed()
cloud_to_packed_first_idx = pointclouds_cuda.cloud_to_packed_first_idx()
num_points_per_cloud = pointclouds_cuda.num_points_per_cloud()
args = (
points_packed,
cloud_to_packed_first_idx,
num_points_per_cloud,
image_size,
radius,
bin_size,
max_points_per_bin,
)
bp_cuda = _C._rasterize_points_coarse(*args)
# Bin points might not be the same: CUDA version might write them in
# any order. But if we sort the non-(-1) elements of the CUDA output
# then they should be the same.
for n in range(N):
for by in range(bp_cpu.shape[1]):
for bx in range(bp_cpu.shape[2]):
K = (bp_cpu[n, by, bx] != -1).sum().item()
idxs_cpu = bp_cpu[n, by, bx].tolist()
idxs_cuda = bp_cuda[n, by, bx].tolist()
idxs_cuda[:K] = sorted(idxs_cuda[:K])
self.assertEqual(idxs_cpu, idxs_cuda)
def _test_coarse_rasterize(self, device):
#
# Note that +Y is up and +X is left in the diagram below.
#
# (4) |2
# |
# |
# |
# |1
# |
# (1) |
# | (2)
# ____________(0)__(5)___________________
# 2 1 | -1 -2
# |
# (3) |
# |
# |-1
# |
#
# Locations of the points are shown by o. The screen bounding box
# is between [-1, 1] in both the x and y directions.
#
# These points are interesting because:
# (0) Falls into two bins;
# (1) and (2) fall into one bin;
# (3) is out-of-bounds, but its disk is in-bounds;
# (4) is out-of-bounds, and its entire disk is also out-of-bounds
# (5) has a negative z-value, so it should be skipped
# fmt: off
points = torch.tensor(
[
[ 0.5, 0.0, 0.0], # noqa: E241, E201
[ 0.5, 0.5, 0.1], # noqa: E241, E201
[-0.3, 0.4, 0.0], # noqa: E241
[ 1.1, -0.5, 0.2], # noqa: E241, E201
[ 2.0, 2.0, 0.3], # noqa: E241, E201
[ 0.0, 0.0, -0.1], # noqa: E241, E201
],
device=device
)
# fmt: on
image_size = 16
radius = 0.2
bin_size = 8
max_points_per_bin = 5
bin_points_expected = -1 * torch.ones(
1, 2, 2, 5, dtype=torch.int32, device=device
)
# Note that the order is only deterministic here for CUDA if all points
# fit in one chunk. This will the the case for this small example, but
# to properly exercise coordianted writes among multiple chunks we need
# to use a bigger test case.
bin_points_expected[0, 1, 0, :2] = torch.tensor([0, 3])
bin_points_expected[0, 0, 1, 0] = torch.tensor([2])
bin_points_expected[0, 0, 0, :2] = torch.tensor([0, 1])
pointclouds = Pointclouds(points=[points])
args = (
pointclouds.points_packed(),
pointclouds.cloud_to_packed_first_idx(),
pointclouds.num_points_per_cloud(),
image_size,
radius,
bin_size,
max_points_per_bin,
)
bin_points = _C._rasterize_points_coarse(*args)
bin_points_same = (bin_points == bin_points_expected).all()
self.assertTrue(bin_points_same.item() == 1)