update serial and spatial force components
This commit is contained in:
@@ -906,8 +906,361 @@ class PressureDirectionEstimator:
|
||||
|
||||
return angle, mag
|
||||
|
||||
@dataclass
|
||||
class LocalForceResult:
|
||||
angle: float
|
||||
magnitude: float
|
||||
planar_x: float
|
||||
planar_y: float
|
||||
confidence: float
|
||||
contact_active: bool
|
||||
reportable: bool
|
||||
total_pressure: float
|
||||
peak: float
|
||||
cop_x: float
|
||||
cop_y: float
|
||||
threshold: float
|
||||
|
||||
_estimator = PressureDirectionEstimator()
|
||||
|
||||
class LocalTangentialForceEstimator:
|
||||
CONTACT_ENTER_TOTAL_THRESHOLD = 520.0
|
||||
CONTACT_ENTER_PEAK_THRESHOLD = 50.0
|
||||
CONTACT_EXIT_TOTAL_THRESHOLD = 260.0
|
||||
CONTACT_EXIT_PEAK_THRESHOLD = 28.0
|
||||
CONTACT_ENTER_FRAMES_REQUIRED = 2
|
||||
CONTACT_EXIT_FRAMES_REQUIRED = 8
|
||||
|
||||
BASELINE_IDLE_ALPHA = 0.035
|
||||
BASELINE_BOOTSTRAP_ALPHA = 1.0
|
||||
BASELINE_NOISE_FLOOR = 5.0
|
||||
|
||||
ACTIVE_CELL_MIN_VALUE = 18.0
|
||||
ACTIVE_CELL_PEAK_RATIO = 0.14
|
||||
MIN_ACTIVE_CELLS = 3
|
||||
|
||||
VECTOR_SMOOTHING_ALPHA = 0.16
|
||||
REPORT_MAGNITUDE_ENTER = 0.12
|
||||
REPORT_MAGNITUDE_EXIT = 0.045
|
||||
REPORT_CONFIDENCE_ENTER = 0.14
|
||||
REPORT_CONFIDENCE_EXIT = 0.06
|
||||
REPORT_HOLD_FRAMES = 10
|
||||
|
||||
ASYMMETRY_WEIGHT = 1.1
|
||||
DRIFT_WEIGHT = 0.65
|
||||
MOTION_WEIGHT = 0.25
|
||||
EDGE_ASYMMETRY_DAMPING = 0.35
|
||||
EDGE_INWARD_ROLLING_BIAS = 0.55
|
||||
EDGE_START_COP_THRESHOLD = 0.45
|
||||
EDGE_START_BIAS_WEIGHT = 1.1
|
||||
ROLLING_FRICTION_ALPHA = 0.68
|
||||
ROLLING_FRICTION_MIN_MAGNITUDE = 0.05
|
||||
|
||||
def __init__(self):
|
||||
self.reset_all()
|
||||
|
||||
def reset_all(self):
|
||||
self.baseline_frame = None
|
||||
self.reset_contact_state()
|
||||
|
||||
def reset_contact_state(self):
|
||||
self.contact_active = False
|
||||
self.contact_enter_counter = 0
|
||||
self.contact_exit_counter = 0
|
||||
self.anchor_cop_x = None
|
||||
self.anchor_cop_y = None
|
||||
self.last_cop_x = None
|
||||
self.last_cop_y = None
|
||||
self.edge_start_bias_x = 0.0
|
||||
self.edge_start_bias_y = 0.0
|
||||
self.smoothed_x = 0.0
|
||||
self.smoothed_y = 0.0
|
||||
self.report_active = False
|
||||
self.report_hold_counter = 0
|
||||
self.held_report = None
|
||||
|
||||
def update(self, adc_data, timestamp_ms: float) -> LocalForceResult:
|
||||
raw = np.asarray(adc_data, dtype=np.float32).flatten()
|
||||
if len(raw) != ADC_LEN:
|
||||
raise ValueError(f"ADC data length must be {ADC_LEN}")
|
||||
|
||||
baseline_subtracted = self._subtract_baseline(raw)
|
||||
if not self._update_contact_state(raw, baseline_subtracted):
|
||||
return self._inactive_result(float(np.sum(baseline_subtracted)), float(np.max(baseline_subtracted, initial=0.0)))
|
||||
|
||||
stats = self._compute_contact_stats(baseline_subtracted)
|
||||
if stats is None:
|
||||
return self._stabilize_report(self._weak_contact_result(float(np.sum(baseline_subtracted)), float(np.max(baseline_subtracted, initial=0.0))))
|
||||
|
||||
if self.anchor_cop_x is None:
|
||||
self.anchor_cop_x = stats["cop_x"]
|
||||
self.anchor_cop_y = stats["cop_y"]
|
||||
self.last_cop_x = stats["cop_x"]
|
||||
self.last_cop_y = stats["cop_y"]
|
||||
self.edge_start_bias_x, self.edge_start_bias_y = self._edge_start_bias(stats)
|
||||
return self._stabilize_report(self._weak_contact_result(stats["total"], stats["peak"], stats["cop_x"], stats["cop_y"]))
|
||||
|
||||
anchor_x = self.anchor_cop_x
|
||||
anchor_y = self.anchor_cop_y if self.anchor_cop_y is not None else stats["cop_y"]
|
||||
last_x = self.last_cop_x if self.last_cop_x is not None else stats["cop_x"]
|
||||
last_y = self.last_cop_y if self.last_cop_y is not None else stats["cop_y"]
|
||||
|
||||
drift_x = stats["cop_x"] - anchor_x
|
||||
drift_y = stats["cop_y"] - anchor_y
|
||||
motion_x = stats["cop_x"] - last_x
|
||||
motion_y = stats["cop_y"] - last_y
|
||||
|
||||
kinematic_x = drift_x * self.DRIFT_WEIGHT + motion_x * self.MOTION_WEIGHT
|
||||
kinematic_y = drift_y * self.DRIFT_WEIGHT + motion_y * self.MOTION_WEIGHT
|
||||
asymmetry_x, asymmetry_y = self._damp_edge_asymmetry(
|
||||
stats,
|
||||
kinematic_x + self.edge_start_bias_x,
|
||||
kinematic_y + self.edge_start_bias_y,
|
||||
)
|
||||
|
||||
combined_x = asymmetry_x + kinematic_x + self.edge_start_bias_x
|
||||
combined_y = asymmetry_y + kinematic_y + self.edge_start_bias_y
|
||||
combined_x, combined_y = self._apply_rolling_friction(
|
||||
self.smoothed_x,
|
||||
self.smoothed_y,
|
||||
combined_x,
|
||||
combined_y,
|
||||
)
|
||||
|
||||
self.smoothed_x += (combined_x - self.smoothed_x) * self.VECTOR_SMOOTHING_ALPHA
|
||||
self.smoothed_y += (combined_y - self.smoothed_y) * self.VECTOR_SMOOTHING_ALPHA
|
||||
self.last_cop_x = stats["cop_x"]
|
||||
self.last_cop_y = stats["cop_y"]
|
||||
|
||||
planar_x = self.smoothed_x
|
||||
planar_y = -self.smoothed_y
|
||||
angle, magnitude = self.compute_vector_angle(planar_x, planar_y)
|
||||
|
||||
active_span_rows = (stats["max_row"] - stats["min_row"] + 1) / SENSOR_ROWS
|
||||
active_span_cols = (stats["max_col"] - stats["min_col"] + 1) / SENSOR_COLS
|
||||
activity = min(max(stats["active_cells"] / ADC_LEN, 0.0), 1.0)
|
||||
span = min(max((active_span_rows + active_span_cols) * 0.5, 0.0), 1.0)
|
||||
pressure_ratio = min(max(stats["active_total"] / max(stats["total"], 1.0), 0.0), 1.0)
|
||||
peak_ratio = min(max(stats["peak"] / (stats["total"] / stats["active_cells"] + 1.0), 0.0), 1.0)
|
||||
confidence = min(max(activity * 0.35 + span * 0.2 + pressure_ratio * 0.3 + peak_ratio * 0.15, 0.0), 1.0)
|
||||
|
||||
return self._stabilize_report(LocalForceResult(
|
||||
angle=angle,
|
||||
magnitude=magnitude,
|
||||
planar_x=planar_x,
|
||||
planar_y=planar_y,
|
||||
confidence=confidence,
|
||||
contact_active=True,
|
||||
reportable=False,
|
||||
total_pressure=stats["total"],
|
||||
peak=stats["peak"],
|
||||
cop_x=stats["cop_x"],
|
||||
cop_y=stats["cop_y"],
|
||||
threshold=self.CONTACT_ENTER_TOTAL_THRESHOLD,
|
||||
))
|
||||
|
||||
def _update_idle_baseline(self, raw_frame, alpha: float):
|
||||
if self.baseline_frame is None:
|
||||
self.baseline_frame = np.array(raw_frame, dtype=np.float32).copy()
|
||||
return
|
||||
self.baseline_frame += (raw_frame - self.baseline_frame) * alpha
|
||||
|
||||
def _subtract_baseline(self, raw_frame):
|
||||
if self.baseline_frame is None:
|
||||
self._update_idle_baseline(raw_frame, self.BASELINE_BOOTSTRAP_ALPHA)
|
||||
diff = raw_frame - self.baseline_frame - self.BASELINE_NOISE_FLOOR
|
||||
return np.clip(diff, 0, None)
|
||||
|
||||
def _pressure_metrics(self, frame):
|
||||
return float(np.sum(frame)), float(np.max(frame, initial=0.0))
|
||||
|
||||
def _update_contact_state(self, raw_frame, frame) -> bool:
|
||||
total, peak = self._pressure_metrics(frame)
|
||||
enter = total >= self.CONTACT_ENTER_TOTAL_THRESHOLD and peak >= self.CONTACT_ENTER_PEAK_THRESHOLD
|
||||
exit_frame = total <= self.CONTACT_EXIT_TOTAL_THRESHOLD or peak <= self.CONTACT_EXIT_PEAK_THRESHOLD
|
||||
|
||||
if self.contact_active:
|
||||
if exit_frame:
|
||||
self.contact_exit_counter += 1
|
||||
if self.contact_exit_counter >= self.CONTACT_EXIT_FRAMES_REQUIRED:
|
||||
self._update_idle_baseline(raw_frame, self.BASELINE_IDLE_ALPHA)
|
||||
self.reset_contact_state()
|
||||
return False
|
||||
else:
|
||||
self.contact_exit_counter = 0
|
||||
return True
|
||||
|
||||
if enter:
|
||||
self.contact_enter_counter += 1
|
||||
if self.contact_enter_counter >= self.CONTACT_ENTER_FRAMES_REQUIRED:
|
||||
self.contact_active = True
|
||||
self.contact_enter_counter = 0
|
||||
self.contact_exit_counter = 0
|
||||
return True
|
||||
return False
|
||||
|
||||
self.contact_enter_counter = 0
|
||||
self._update_idle_baseline(raw_frame, self.BASELINE_IDLE_ALPHA)
|
||||
return False
|
||||
|
||||
def _compute_contact_stats(self, frame):
|
||||
total, peak = self._pressure_metrics(frame)
|
||||
if total <= 0.0 or peak <= 0.0:
|
||||
return None
|
||||
|
||||
active_threshold = max(peak * self.ACTIVE_CELL_PEAK_RATIO, self.ACTIVE_CELL_MIN_VALUE)
|
||||
frame2d = np.asarray(frame, dtype=np.float32).reshape(SENSOR_ROWS, SENSOR_COLS)
|
||||
active_mask = frame2d >= active_threshold
|
||||
active_cells = int(np.count_nonzero(active_mask))
|
||||
if active_cells < self.MIN_ACTIVE_CELLS:
|
||||
return None
|
||||
|
||||
active_values = frame2d[active_mask]
|
||||
active_total = float(np.sum(active_values))
|
||||
if active_total <= 0.0:
|
||||
return None
|
||||
|
||||
rows, cols = np.nonzero(active_mask)
|
||||
cop_x = float(np.sum(active_values * cols) / active_total)
|
||||
cop_y = float(np.sum(active_values * rows) / active_total)
|
||||
min_row, max_row = int(np.min(rows)), int(np.max(rows))
|
||||
min_col, max_col = int(np.min(cols)), int(np.max(cols))
|
||||
bbox_center_x = (min_col + max_col) * 0.5
|
||||
bbox_center_y = (min_row + max_row) * 0.5
|
||||
half_width = max(max_col - min_col, 1) * 0.5
|
||||
half_height = max(max_row - min_row, 1) * 0.5
|
||||
asymmetry_x = float(np.sum(active_values * ((cols - bbox_center_x) / half_width)) / active_total)
|
||||
asymmetry_y = float(np.sum(active_values * ((rows - bbox_center_y) / half_height)) / active_total)
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"peak": peak,
|
||||
"active_total": active_total,
|
||||
"active_cells": active_cells,
|
||||
"min_row": min_row,
|
||||
"max_row": max_row,
|
||||
"min_col": min_col,
|
||||
"max_col": max_col,
|
||||
"cop_x": cop_x,
|
||||
"cop_y": cop_y,
|
||||
"asymmetry_x": asymmetry_x,
|
||||
"asymmetry_y": asymmetry_y,
|
||||
}
|
||||
|
||||
def _contact_touches_edge(self, stats) -> bool:
|
||||
return (
|
||||
stats["min_row"] == 0
|
||||
or stats["max_row"] == SENSOR_ROWS - 1
|
||||
or stats["min_col"] == 0
|
||||
or stats["max_col"] == SENSOR_COLS - 1
|
||||
)
|
||||
|
||||
def _damp_edge_asymmetry(self, stats, kinematic_x: float, kinematic_y: float):
|
||||
asymmetry_x = stats["asymmetry_x"] * self.ASYMMETRY_WEIGHT
|
||||
asymmetry_y = stats["asymmetry_y"] * self.ASYMMETRY_WEIGHT
|
||||
|
||||
if stats["min_col"] == 0 and asymmetry_x < 0.0:
|
||||
asymmetry_x = -asymmetry_x * self.EDGE_INWARD_ROLLING_BIAS
|
||||
if stats["max_col"] == SENSOR_COLS - 1 and asymmetry_x > 0.0:
|
||||
asymmetry_x = -asymmetry_x * self.EDGE_INWARD_ROLLING_BIAS
|
||||
if stats["min_row"] == 0 and asymmetry_y < 0.0:
|
||||
asymmetry_y = -asymmetry_y * self.EDGE_INWARD_ROLLING_BIAS
|
||||
if stats["max_row"] == SENSOR_ROWS - 1 and asymmetry_y > 0.0:
|
||||
asymmetry_y = -asymmetry_y * self.EDGE_INWARD_ROLLING_BIAS
|
||||
|
||||
opposing_dot = asymmetry_x * kinematic_x + asymmetry_y * kinematic_y
|
||||
kinematic_mag = float(np.hypot(kinematic_x, kinematic_y))
|
||||
if self._contact_touches_edge(stats) and opposing_dot < 0.0 and kinematic_mag >= self.ROLLING_FRICTION_MIN_MAGNITUDE:
|
||||
asymmetry_x *= self.EDGE_ASYMMETRY_DAMPING
|
||||
asymmetry_y *= self.EDGE_ASYMMETRY_DAMPING
|
||||
|
||||
return asymmetry_x, asymmetry_y
|
||||
|
||||
def _edge_start_bias(self, stats):
|
||||
center_x = (SENSOR_COLS - 1) * 0.5
|
||||
center_y = (SENSOR_ROWS - 1) * 0.5
|
||||
normalized_x = min(max((stats["cop_x"] - center_x) / max(center_x, 1.0), -1.0), 1.0)
|
||||
normalized_y = min(max((stats["cop_y"] - center_y) / max(center_y, 1.0), -1.0), 1.0)
|
||||
bias_x = self._edge_start_axis_bias(normalized_x) if stats["min_col"] == 0 or stats["max_col"] == SENSOR_COLS - 1 else 0.0
|
||||
bias_y = self._edge_start_axis_bias(normalized_y) if stats["min_row"] == 0 or stats["max_row"] == SENSOR_ROWS - 1 else 0.0
|
||||
return bias_x, bias_y
|
||||
|
||||
def _edge_start_axis_bias(self, normalized_axis: float) -> float:
|
||||
distance = abs(normalized_axis)
|
||||
if distance <= self.EDGE_START_COP_THRESHOLD:
|
||||
return 0.0
|
||||
strength = min(max((distance - self.EDGE_START_COP_THRESHOLD) / (1.0 - self.EDGE_START_COP_THRESHOLD), 0.0), 1.0)
|
||||
return -np.sign(normalized_axis) * strength * self.EDGE_START_BIAS_WEIGHT
|
||||
|
||||
def _apply_rolling_friction(self, previous_x: float, previous_y: float, current_x: float, current_y: float):
|
||||
previous_mag = float(np.hypot(previous_x, previous_y))
|
||||
current_mag = float(np.hypot(current_x, current_y))
|
||||
if previous_mag < self.ROLLING_FRICTION_MIN_MAGNITUDE or current_mag < self.ROLLING_FRICTION_MIN_MAGNITUDE:
|
||||
return current_x, current_y
|
||||
|
||||
dot = previous_x * current_x + previous_y * current_y
|
||||
if dot >= 0.0:
|
||||
return current_x, current_y
|
||||
|
||||
mixed_x = current_x * (1.0 - self.ROLLING_FRICTION_ALPHA) + previous_x * self.ROLLING_FRICTION_ALPHA
|
||||
mixed_y = current_y * (1.0 - self.ROLLING_FRICTION_ALPHA) + previous_y * self.ROLLING_FRICTION_ALPHA
|
||||
if mixed_x * previous_x + mixed_y * previous_y >= 0.0:
|
||||
return mixed_x, mixed_y
|
||||
|
||||
keep_mag = min(previous_mag, current_mag) * 0.5
|
||||
return previous_x / previous_mag * keep_mag, previous_y / previous_mag * keep_mag
|
||||
|
||||
def _inactive_result(self, total_pressure=0.0, peak=0.0):
|
||||
return LocalForceResult(0.0, 0.0, 0.0, 0.0, 0.0, False, False, total_pressure, peak, 0.0, 0.0, self.CONTACT_ENTER_TOTAL_THRESHOLD)
|
||||
|
||||
def _weak_contact_result(self, total_pressure=0.0, peak=0.0, cop_x=0.0, cop_y=0.0):
|
||||
return LocalForceResult(0.0, 0.0, 0.0, 0.0, 0.0, True, False, total_pressure, peak, cop_x, cop_y, self.CONTACT_ENTER_TOTAL_THRESHOLD)
|
||||
|
||||
def _store_report(self, result: LocalForceResult):
|
||||
result.reportable = True
|
||||
self.report_active = True
|
||||
self.report_hold_counter = 0
|
||||
self.held_report = result
|
||||
return result
|
||||
|
||||
def _hold_or_drop_report(self):
|
||||
if self.report_active and self.report_hold_counter < self.REPORT_HOLD_FRAMES and self.held_report is not None:
|
||||
self.report_hold_counter += 1
|
||||
held = self.held_report
|
||||
held.reportable = True
|
||||
return held
|
||||
self.report_active = False
|
||||
self.report_hold_counter = 0
|
||||
self.held_report = None
|
||||
return self._weak_contact_result()
|
||||
|
||||
def _stabilize_report(self, result: LocalForceResult):
|
||||
if not result.contact_active:
|
||||
self.report_active = False
|
||||
self.report_hold_counter = 0
|
||||
self.held_report = None
|
||||
return result
|
||||
|
||||
can_enter = result.magnitude >= self.REPORT_MAGNITUDE_ENTER and result.confidence >= self.REPORT_CONFIDENCE_ENTER
|
||||
can_stay = result.magnitude >= self.REPORT_MAGNITUDE_EXIT and result.confidence >= self.REPORT_CONFIDENCE_EXIT
|
||||
if self.report_active:
|
||||
if can_stay:
|
||||
return self._store_report(result)
|
||||
return self._hold_or_drop_report()
|
||||
if can_enter:
|
||||
return self._store_report(result)
|
||||
return result
|
||||
|
||||
def compute_vector_angle(self, x: float, y: float) -> Tuple[float, float]:
|
||||
magnitude = float(np.hypot(x, y))
|
||||
if magnitude <= np.finfo(np.float32).eps:
|
||||
return 0.0, 0.0
|
||||
angle = float(np.degrees(np.arctan2(y, x)))
|
||||
if angle < 0.0:
|
||||
angle += 360.0
|
||||
return angle, magnitude
|
||||
|
||||
|
||||
_estimator = LocalTangentialForceEstimator()
|
||||
|
||||
|
||||
def reset_cop_state():
|
||||
@@ -924,16 +1277,16 @@ def compute_pressure_direction(adc_data, timestamp_ms: float):
|
||||
return (
|
||||
result.cop_x,
|
||||
result.cop_y,
|
||||
result.row_min,
|
||||
result.row_max,
|
||||
result.col_min,
|
||||
result.col_max,
|
||||
result.dx,
|
||||
result.dy,
|
||||
result.base_x,
|
||||
result.base_y,
|
||||
0,
|
||||
SENSOR_ROWS - 1,
|
||||
0,
|
||||
SENSOR_COLS - 1,
|
||||
result.planar_x,
|
||||
result.planar_y,
|
||||
0.0,
|
||||
0.0,
|
||||
result.magnitude,
|
||||
result.state,
|
||||
1 if result.reportable else 0,
|
||||
result.total_pressure,
|
||||
result.threshold,
|
||||
)
|
||||
@@ -956,11 +1309,11 @@ def get_pzt_angle(adc_data, timestamp_ms: float):
|
||||
return (
|
||||
result.angle,
|
||||
result.magnitude,
|
||||
result.state,
|
||||
1 if result.reportable else 0,
|
||||
result.cop_x,
|
||||
result.cop_y,
|
||||
result.base_x,
|
||||
result.base_y,
|
||||
0.0,
|
||||
0.0,
|
||||
result.total_pressure,
|
||||
result.threshold,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user