-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrobot.py
More file actions
252 lines (212 loc) · 8.52 KB
/
robot.py
File metadata and controls
252 lines (212 loc) · 8.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import importlib.machinery
import importlib.util
import struct
from dataclasses import dataclass
from pathlib import Path
from threading import Event, Thread
finder = importlib.machinery.PathFinder()
spec = finder.find_spec(
'robot_interface',
[str(Path(__file__).resolve().parent)]
)
if spec is None:
raise
sdk = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sdk)
id_to_real_index = {
'FR_0': 0, 'FR_1': 1, 'FR_2': 2,
'FL_0': 3, 'FL_1': 4, 'FL_2': 5,
'RR_0': 6, 'RR_1': 7, 'RR_2': 8,
'RL_0': 9, 'RL_1': 10, 'RL_2': 11,
}
real_index_to_id = {v: k for k, v in id_to_real_index.items()}
name_to_id = {
'FL_hip_joint': 'FL_0', 'FL_thigh_joint': 'FL_1', 'FL_calf_joint': 'FL_2',
'FR_hip_joint': 'FR_0', 'FR_thigh_joint': 'FR_1', 'FR_calf_joint': 'FR_2',
'RL_hip_joint': 'RL_0', 'RL_thigh_joint': 'RL_1', 'RL_calf_joint': 'RL_2',
'RR_hip_joint': 'RR_0', 'RR_thigh_joint': 'RR_1', 'RR_calf_joint': 'RR_2',
}
id_to_name = {v: k for k, v in name_to_id.items()}
sim_index_to_name = [
'FL_hip_joint', 'FL_thigh_joint', 'FL_calf_joint',
'FR_hip_joint', 'FR_thigh_joint', 'FR_calf_joint',
'RL_hip_joint', 'RL_thigh_joint', 'RL_calf_joint',
'RR_hip_joint', 'RR_thigh_joint', 'RR_calf_joint',
]
name_to_sim_index = {name: i for i, name in enumerate(sim_index_to_name)}
name_to_q0 = {
'FL_hip_joint': 0.1,
'RL_hip_joint': 0.1,
'FR_hip_joint': -0.1,
'RR_hip_joint': -0.1,
'FL_thigh_joint': 0.8,
'RL_thigh_joint': 1.0,
'FR_thigh_joint': 0.8,
'RR_thigh_joint': 1.0,
'FL_calf_joint': -1.5,
'RL_calf_joint': -1.5,
'FR_calf_joint': -1.5,
'RR_calf_joint': -1.5,
}
num_joints = len(sim_index_to_name)
# sim index to real index mapping is for real-to-sim conversion
# for sim_idx, real_idx in enumerate(sim_index_to_real_index):
# assert real_joints[real_idx] == sim_joints[sim_idx]
sim_idx_to_real_idx = [id_to_real_index[name_to_id[name]] for name in sim_index_to_name]
# real index to sim index mapping is for sim-to-real conversion
# even if dict is ordered in Python 3.7 or later, it is not sorted by its key
# thus, we need to use `real_index_to_id[i] for i in range(num_joints)`
# instead of `for id in real_index_to_id.values()`
real_idx_to_sim_idx = [name_to_sim_index[id_to_name[real_index_to_id[i]]] for i in range(num_joints)]
q0_sim = [name_to_q0[name] for name in sim_index_to_name]
q0_real = [name_to_q0[id_to_name[real_index_to_id[i]]] for i in range(num_joints)]
@dataclass
class RobotObservation:
joint_position: 'list[float]'
joint_velocity: 'list[float]'
gyroscope: 'list[float]'
quaternion: 'list[float]'
roll: float
pitch: float
yaw: float
lx: float
ly: float
rx: float
ry: float
L1: bool
L2: bool
class Robot:
LOWLEVEL = 0xFF
def __init__(self):
self.state = sdk.LowState()
# pybind11 will convert std::array into python list
self.motor_state_real = self.state.motorState[:12] # std::array<MotorState, 20>
self.motor_state_sim = [self.motor_state_real[i] for i in sim_idx_to_real_idx]
self.imu = self.state.imu
# the struct module does have compiled format cache
# reuse the Struct object explicitly for clarity, not performance
self.rocker_struct = struct.Struct('@5f')
self.Δq_real = [float('NaN') for _ in range(12)]
self.stopped = Event()
self.background_thread = Thread(target=self._send_recv_loop, daemon=True)
self.background_thread.start()
def _send_recv_loop(self):
# parameter
Kp = 20.0
Kd = 0.5
# shorthand
state = self.state
stopped = self.stopped
# setup
udp = sdk.UDP(Robot.LOWLEVEL, 8080, '192.168.123.10', 8007)
safe = sdk.Safety(sdk.LeggedType.Go1)
udp.Send() # make MCU happy
while not stopped.wait(0.005):
udp.Recv()
udp.GetRecv(state)
if state.tick != 0:
self.Δq_real = [ms.q - q0 for ms, q0 in zip(state.motorState, q0_real)]
break
# === Loop Invariant Code Motion ===
cmd = sdk.LowCmd()
udp.InitCmdData(cmd)
motor_cmd = cmd.motorCmd[:12]
# PositionLimit and PowerProtect won't modify dq, Kp, Kd, tau
for mc in motor_cmd:
mc.dq = 0.0 # expected velocity
mc.Kp = Kp # stiffness
mc.Kd = Kd # damping
mc.tau = 0.0 # expected torque
# === Loop Invariant Code Motion ===
# Microbenchmark for torch.Tensor versus builtin.list
# for len=12, torch.float32 and builtin.float(f64)
# on Go1.NX CPU and Python 3.6.9
# copy:
# tolist + copy from list cost ~(1.5+2.3) μs
# directly copy from torch tensor to list cost ~70 μs
# permute:
# permute tensor by tensor cost ~11 μs
# permute list by list cost ~2.5 μs (assignment) / ~1.9 μs (comprenhension)
# add:
# add tensor by tensor cost ~5.9us
# add list by list cost ~3.1us
#
# additional observation:
# tensor has a much higher peak latency than list
#
# conclusion:
# for len=12, use list instead of tensor
# Microbenchmark for pybind11 STL container automatic conversion performance
# NOTE: This automatic conversions involve a copy operation
# that prevents pass-by-reference semantics.
# However, the elements in the container are still passed by reference.
# NOTE: state.motorState[i] will first construct a list, then get item from it.
# Thus, [state.motorState[i].q for i in range(12)] will construct the list 12 times.
# on Go1.NX CPU and Python 3.6.9
# x = [state.motorState[i].q for i in range(12)] cost ~175 μs
# x = [ms.q for ms in motor_state] cost ~9 μs
# cmd.motorCmd[i].q = qs[i] cost ~185 μs
# mc.q = q for mc, q in zip(motor_cmd, qs) cost ~12 μs
while not stopped.wait(0.005):
for mc, q0, Δq in zip(motor_cmd, q0_real, self.Δq_real):
mc.q = q0 + Δq # expected position
safe.PositionLimit(cmd)
udp.Recv()
udp.GetRecv(state)
safe.PowerProtect(cmd, state, 5)
udp.SetSend(cmd)
udp.Send()
def get_obs(self):
# shorthand
motor_state_sim = self.motor_state_sim
joint_position = [ms.q - q0 for ms, q0 in zip(motor_state_sim, q0_sim)]
joint_velocity = [ms.dq for ms in motor_state_sim]
# imu.gyroscope & .rpy: std::array<float, 3>
# imu.quaternion: std::array<float, 4>
# => list[float]
imu = self.imu
gyroscope = imu.gyroscope # rpy order, rad/s
quaternion = imu.quaternion # (w, x, y, z) order, normalized
roll, pitch, yaw = imu.rpy # rpy order, rad
rc = self.state.wirelessRemote # std::array<uint8_t, 40> => list[int]
# stdlib struct.unpack is faster than convert tensor then .view(torch.float32)
# and torch<=1.10 only support .view to dtype with same size
lx, rx, ry, _, ly = self.rocker_struct.unpack(bytes(rc[4:24]))
# LSB -> MSB
# rc[2] = [R1, L1, start, select][R2, L2, F1, F2]
# rc[3] = [A, B, X, Y][up, right, down, left]
button_L1 = rc[2] & 0b0000_0010
button_L2 = rc[2] & 0b0010_0000
return RobotObservation(
joint_position=joint_position, # in sim order, relative to q0
joint_velocity=joint_velocity, # in sim order
gyroscope=gyroscope,
quaternion=quaternion,
roll=roll,
pitch=pitch,
yaw=yaw,
lx=lx,
ly=ly,
rx=rx,
ry=ry,
L1=bool(button_L1),
L2=bool(button_L2),
)
def set_act(self, action: 'list[float]'):
self.Δq_real = [action[i] for i in real_idx_to_sim_idx]
def init(self):
import math
stopped = self.stopped
while any(math.isnan(Δq) for Δq in self.Δq_real) and not stopped.wait(0.05):
pass
Δq_sequence = []
Δq_t = self.Δq_real
while any(abs(Δq) > 0.01 for Δq in Δq_t):
# reduce Δq magnitude by 0.05 rad per step
Δq_t = [math.copysign(max(0, abs(Δq) - 0.05), Δq) for Δq in Δq_t]
Δq_sequence.append(Δq_t)
for Δq_t in Δq_sequence:
self.Δq_real = Δq_t
# 0.05 sec per step
if stopped.wait(0.05):
break