import numpy as np
import taichi as ti
import torch
import genesis as gs
from genesis.engine.states.entities import MPMEntityState
from genesis.utils.misc import to_gs_tensor
from .particle_entity import ParticleEntity
[文档]@ti.data_oriented
class MPMEntity(ParticleEntity):
"""
MPM-based particle entity.
"""
def __init__(
self, scene, solver, material, morph, surface, particle_size, idx, particle_start, vvert_start, vface_start
):
if isinstance(material, (gs.materials.MPM.Liquid, gs.materials.MPM.Sand, gs.materials.MPM.Snow)):
need_skinning = False
else:
need_skinning = True
super().__init__(
scene,
solver,
material,
morph,
surface,
particle_size,
idx,
particle_start,
vvert_start,
vface_start,
need_skinning=need_skinning,
)
[文档] def init_tgt_keys(self):
self._tgt_keys = ["vel", "pos", "act", "actu"]
def _add_to_solver_(self):
self._solver._kernel_add_particles(
self._sim.cur_substep_local,
self.active,
self._particle_start,
self._n_particles,
self._material.idx,
self._material._default_Jp,
self._material.rho,
self._particles,
)
[文档] def set_pos(self, f, pos):
self.solver._kernel_set_particles_pos(
f,
self._particle_start,
self._n_particles,
pos,
)
[文档] def set_pos_grad(self, f, pos_grad):
self.solver._kernel_set_particles_pos_grad(
f,
self._particle_start,
self._n_particles,
pos_grad,
)
[文档] def set_vel(self, f, vel):
self.solver._kernel_set_particles_vel(
f,
self._particle_start,
self._n_particles,
vel,
)
[文档] def set_vel_grad(self, f, vel_grad):
self.solver._kernel_set_particles_vel_grad(
f,
self._particle_start,
self._n_particles,
vel_grad,
)
[文档] def set_actu(self, f, actu):
self.solver._kernel_set_particles_actu(
f,
self._particle_start,
self._n_particles,
self.material.n_groups,
actu,
)
[文档] def set_actu_grad(self, f, actu_grad):
self.solver._kernel_set_particles_actu_grad(
f,
self._particle_start,
self._n_particles,
actu_grad,
)
[文档] def set_muscle_group(self, muscle_group):
self.solver._kernel_set_muscle_group(
self._particle_start,
self._n_particles,
muscle_group,
)
[文档] def get_muscle_group(self):
muscle_group = gs.zeros((self._n_particles,), dtype=int, requires_grad=False, scene=self._scene)
self.solver._kernel_get_muscle_group(
self._particle_start,
self._n_particles,
muscle_group,
)
return muscle_group
[文档] def set_muscle_direction(self, muscle_direction):
self.solver._kernel_set_muscle_direction(
self._particle_start,
self._n_particles,
muscle_direction,
)
[文档] def set_active(self, f, active):
self.solver._kernel_set_particles_active(
f,
self._particle_start,
self._n_particles,
active,
)
[文档] def set_active_arr(self, f, active):
self.solver._kernel_set_particles_active_arr(
f,
self._particle_start,
self._n_particles,
active,
)
[文档] def set_actuation(self, actu):
self._assert_active()
actu = to_gs_tensor(actu)
n_groups = getattr(self.material, "n_groups", 1)
if len(actu.shape) == 0:
assert actu.shape == ()
self._tgt["actu"] = torch.tile(actu, [self.n_particles, n_groups])
elif len(actu.shape) == 1:
if actu.shape[0] == n_groups:
assert self.n_particles != n_groups # ambiguous
actu = actu.tile([self.n_particles, 1])
else:
assert actu.shape == (self.n_particles,)
gs.raise_exception("Cannot set per-particle actuation")
self._tgt["actu"] = actu
else:
gs.raise_exception("Tensor shape not supported.")
[文档] def set_muscle(self, muscle_group=None, muscle_direction=None):
self._assert_active()
if muscle_group is not None:
n_groups = getattr(self.material, "n_groups", 1)
max_group_id = muscle_group.max().item()
muscle_group = to_gs_tensor(muscle_group)
assert muscle_group.shape == (self.n_particles,)
assert isinstance(max_group_id, int) and max_group_id < n_groups
self.set_muscle_group(muscle_group)
if muscle_direction is not None:
muscle_direction = to_gs_tensor(muscle_direction)
assert muscle_direction.shape == (self.n_particles, 3)
assert torch.allclose(muscle_direction.norm(dim=-1), torch.Tensor([1.0]).to(muscle_direction))
self.set_muscle_direction(muscle_direction)
[文档] @ti.kernel
def clear_grad(self, f: ti.i32):
for i in range(self.n_particles):
i_global = i + self._particle_start
self._solver.particles.grad[f, i_global].pos = 0
self._solver.particles.grad[f, i_global].vel = 0
self._solver.particles.grad[f, i_global].C = 0
self._solver.particles.grad[f, i_global].F = 0
self._solver.particles.grad[f, i_global].F_tmp = 0
self._solver.particles.grad[f, i_global].Jp = 0
self._solver.particles.grad[f, i_global].U = 0
self._solver.particles.grad[f, i_global].V = 0
self._solver.particles.grad[f, i_global].S = 0
self._solver.particles.grad[f, i_global].actu = 0
[文档] @ti.kernel
def get_frame(
self,
f: ti.i32,
pos: ti.types.ndarray(),
vel: ti.types.ndarray(),
C: ti.types.ndarray(),
F: ti.types.ndarray(),
Jp: ti.types.ndarray(),
active: ti.types.ndarray(),
):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
pos[i, j] = self._solver.particles[f, i_global].pos[j]
vel[i, j] = self._solver.particles[f, i_global].vel[j]
for k in ti.static(range(3)):
C[i, j, k] = self._solver.particles[f, i_global].C[j, k]
F[i, j, k] = self._solver.particles[f, i_global].F[j, k]
Jp[i] = self._solver.particles[f, i_global].Jp
active[i] = self._solver.particles_ng[f, i_global].active
[文档] @ti.kernel
def set_frame_add_grad_pos(self, f: ti.i32, pos_grad: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
self._solver.particles.grad[f, i_global].pos[j] += pos_grad[i, j]
[文档] @ti.kernel
def set_frame_add_grad_vel(self, f: ti.i32, vel_grad: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
self._solver.particles.grad[f, i_global].vel[j] += vel_grad[i, j]
[文档] @ti.kernel
def set_frame_add_grad_C(self, f: ti.i32, C_grad: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
for k in ti.static(range(3)):
self._solver.particles.grad[f, i_global].C[j, k] += C_grad[i, j, k]
[文档] @ti.kernel
def set_frame_add_grad_F(self, f: ti.i32, F_grad: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
for k in ti.static(range(3)):
self._solver.particles.grad[f, i_global].F[j, k] += F_grad[i, j, k]
[文档] @ti.kernel
def set_frame_add_grad_Jp(self, f: ti.i32, Jp_grad: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
self._solver.particles.grad[f, i_global].Jp += Jp_grad[i]
[文档] def add_grad_from_state(self, state):
if state.pos.grad is not None:
state.pos.assert_contiguous()
self.set_frame_add_grad_pos(self._sim.cur_substep_local, state.pos.grad)
if state.vel.grad is not None:
state.vel.assert_contiguous()
self.set_frame_add_grad_vel(self._sim.cur_substep_local, state.vel.grad)
if state.C.grad is not None:
state.C.assert_contiguous()
self.set_frame_add_grad_C(self._sim.cur_substep_local, state.C.grad)
if state.F.grad is not None:
state.F.assert_contiguous()
self.set_frame_add_grad_F(self._sim.cur_substep_local, state.F.grad)
if state.Jp.grad is not None:
state.Jp.assert_contiguous()
self.set_frame_add_grad_Jp(self._sim.cur_substep_local, state.Jp.grad)
[文档] @gs.assert_built
def get_particles(self):
pos = np.empty((self.n_particles, 3), dtype=gs.np_float)
self._kernel_get_particles(self._sim.cur_substep_local, pos)
return pos
@ti.kernel
def _kernel_get_particles(self, f: ti.i32, pos: ti.types.ndarray()):
for i in range(self.n_particles):
i_global = i + self._particle_start
for j in ti.static(range(3)):
pos[i, j] = self._solver.particles[f, i_global].pos[j]
[文档] @gs.assert_built
def get_state(self):
state = MPMEntityState(self, self._sim.cur_step_global)
self.get_frame( # TODO: merge with self._solver.get_state?!
f=self._sim.cur_substep_local,
pos=state.pos,
vel=state.vel,
C=state.C,
F=state.F,
Jp=state.Jp,
active=state.active,
)
# we store all queried states to track gradient flow
self._queried_states.append(state)
return state
@ti.kernel
def _kernel_get_mass(self, mass: ti.types.ndarray()):
total_mass = 0.0
for i in range(self.n_particles):
i_global = i + self._particle_start
total_mass += self._solver.particles_info[i_global].mass / self._solver._p_vol_scale
mass[0] = total_mass