feat(devkit): 替换切向力算法为动态阈值版本,重新编译exe

This commit is contained in:
lenn
2026-05-22 16:11:26 +08:00
parent b581e310ed
commit 030479a962
3 changed files with 141 additions and 42 deletions

View File

@@ -5,8 +5,8 @@ a = Analysis(
['sensor_server.py'],
pathex=[],
binaries=[],
datas=[('sensor_stream_pb2.py', '.'), ('sensor_stream_pb2_grpc.py', '.')],
hiddenimports=['grpc', 'openpyxl', 'numpy'],
datas=[('sensor_stream_pb2*.py', '.')],
hiddenimports=['grpc', 'openpyxl'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],

View File

@@ -248,12 +248,13 @@ class SensorPushServicer(sensor_stream_pb2_grpc.SensorPushServicer):
message = "OK"
if len(frame.matrix) == SENSOR_ROWS * SENSOR_COLS:
try:
angle = get_pzt_angle(frame.matrix)
angle, magnitude, state = get_pzt_angle(frame.matrix)
self.last_angle = angle
if self.frame_count <= 10 or self.frame_count % 30 == 0:
print(
f"[SensorPush] PZT angle frame #{frame.seq} "
f"dts={frame.dts_ms} angle={angle:.2f}"
f"dts={frame.dts_ms} angle={angle:.2f} "
f"mag={magnitude:.3f} state={state}"
)
except Exception as e:
ok = False
@@ -351,13 +352,20 @@ def serve(port: int):
import numpy as np
import threading
from collections import deque
# ===================== 算法参数=====================
TOTAL_PRESSURE_LOW_THRESHOLD = 500
COP_STABILITY_FRAMES_REQUIRED = 5
COP_INIT_MEDIAN_FRAMES = 15 # 初始COP取中位数的帧数
NOISE_COLLECT_FRAMES = 20 # 动态阈值基线采集帧数
THRESH_K = 5 # 阈值 = mean + K * std
SENSOR_ROWS = 12
SENSOR_COLS = 7
# ===================== 二次静置精修参数 =====================
POST_INIT_WINDOW_CNT = 60000
POST_INIT_STABLE_CNT = 200
POST_INIT_STABLE_THRESH = 0.1
# ===================== 线程安全全局状态 =====================
first_frame = None
first_frame_lock = threading.Lock()
@@ -366,7 +374,21 @@ first_contact_CoP_x = None
first_contact_CoP_y = None
contact_initialized = False
total_pressure_low_counter = 0
# 候选初始CoP缓冲
cop_init_x_buf = deque(maxlen=COP_INIT_MEDIAN_FRAMES)
cop_init_y_buf = deque(maxlen=COP_INIT_MEDIAN_FRAMES)
# 动态阈值
noise_sum_buf = deque(maxlen=NOISE_COLLECT_FRAMES)
dynamic_thresh = None
# 二次静置精修状态
post_init_frame_cnt = 0
post_stable_cnt = 0
post_refined_flag = False
post_cand_x = None
post_cand_y = None
# ===================== 基线减除 =====================
def subtract_baseline(current_frame):
@@ -380,37 +402,53 @@ def subtract_baseline(current_frame):
diff = current_frame - first_frame
return np.clip(diff, 0, None)
# ===================== 重置CoP状态 =====================
def reset_cop_state():
global first_contact_CoP_x, first_contact_CoP_y, contact_initialized
global total_pressure_low_counter
global post_init_frame_cnt, post_stable_cnt, post_refined_flag
global post_cand_x, post_cand_y
first_contact_CoP_x = None
first_contact_CoP_y = None
contact_initialized = False
total_pressure_low_counter = 0
cop_init_x_buf.clear()
cop_init_y_buf.clear()
post_init_frame_cnt = 0
post_stable_cnt = 0
post_refined_flag = False
post_cand_x = None
post_cand_y = None
# ===================== CoP压力中心计算 =====================
def compute_pressure_direction(baseline_subtracted_frame):
global first_contact_CoP_x, first_contact_CoP_y, contact_initialized
global total_pressure_low_counter
global post_init_frame_cnt, post_stable_cnt, post_refined_flag
global post_cand_x, post_cand_y
global noise_sum_buf, dynamic_thresh
rows, cols = SENSOR_ROWS, SENSOR_COLS
frame_flat = np.asarray(baseline_subtracted_frame, dtype=np.float32).flatten()
frame2d = frame_flat.reshape(rows, cols)
total_pressure = np.sum(frame2d)
if total_pressure < TOTAL_PRESSURE_LOW_THRESHOLD:
total_pressure_low_counter += 1
else:
total_pressure_low_counter = 0
if total_pressure_low_counter >= COP_STABILITY_FRAMES_REQUIRED:
reset_cop_state()
return 0.0, 0.0
# 动态阈值
if dynamic_thresh is None:
noise_sum_buf.append(total_pressure)
if len(noise_sum_buf) >= NOISE_COLLECT_FRAMES:
sums = np.array(noise_sum_buf)
dynamic_thresh = float(np.mean(sums) + THRESH_K * np.std(sums))
# 低压重置
if dynamic_thresh is not None and total_pressure < dynamic_thresh:
if contact_initialized:
reset_cop_state()
return 0.0, 0.0, 0, rows-1, 0, cols-1, 0.0, 0.0, 0.0, 0.0, 0.0, 0
if total_pressure == 0:
return 0.0, 0.0
return 0.0, 0.0, 0, rows-1, 0, cols-1, 0.0, 0.0, 0.0, 0.0, 0.0, 0
x_grid = np.tile(np.arange(cols), (rows, 1))
y_grid = np.repeat(np.arange(rows), cols).reshape(rows, cols)
@@ -419,16 +457,71 @@ def compute_pressure_direction(baseline_subtracted_frame):
delta_CoP_x = 0.0
delta_CoP_y = 0.0
base_x = cop_x
base_y = cop_y
# ============ 初始点稳定判断(中位数判定) ============
if not contact_initialized:
first_contact_CoP_x = cop_x
first_contact_CoP_y = cop_y
contact_initialized = True
else:
delta_CoP_x = cop_x - first_contact_CoP_x
delta_CoP_y = cop_y - first_contact_CoP_y
cop_init_x_buf.append(cop_x)
cop_init_y_buf.append(cop_y)
if len(cop_init_x_buf) >= COP_INIT_MEDIAN_FRAMES:
xs = list(cop_init_x_buf)
ys = list(cop_init_y_buf)
first_contact_CoP_x = float(np.median(xs))
first_contact_CoP_y = float(np.median(ys))
print(f"[CoP Init] 前{COP_INIT_MEDIAN_FRAMES}帧坐标:")
for i in range(len(xs)):
print(f" frame {i}: x={xs[i]:.3f}, y={ys[i]:.3f}")
print(f" 中位数: x={first_contact_CoP_x:.3f}, y={first_contact_CoP_y:.3f}")
contact_initialized = True
cop_init_x_buf.clear()
cop_init_y_buf.clear()
# ========== 计算偏移量 ==========
else:
# 二次静置精修
post_init_frame_cnt += 1
if not post_refined_flag and post_init_frame_cnt <= POST_INIT_WINDOW_CNT:
if post_cand_x is not None:
dist_val = np.hypot(cop_x - post_cand_x, cop_y - post_cand_y)
if dist_val <= POST_INIT_STABLE_THRESH:
post_stable_cnt += 1
else:
post_cand_x = cop_x
post_cand_y = cop_y
post_stable_cnt = 1
else:
post_cand_x = cop_x
post_cand_y = cop_y
post_stable_cnt = 1
if post_stable_cnt >= POST_INIT_STABLE_CNT:
first_contact_CoP_x = post_cand_x
first_contact_CoP_y = post_cand_y
post_refined_flag = True
else:
post_refined_flag = True
delta_CoP_x = cop_x - first_contact_CoP_x
delta_CoP_y = first_contact_CoP_y - cop_y
base_x = first_contact_CoP_x
base_y = first_contact_CoP_y
magnitude = np.hypot(delta_CoP_x, delta_CoP_y)
if not contact_initialized:
state = 0
elif not post_refined_flag:
state = 1
else:
state = 2
return (cop_x, cop_y,
0, rows-1, 0, cols-1,
delta_CoP_x, delta_CoP_y,
base_x, base_y,
magnitude, state)
return delta_CoP_x, delta_CoP_y
# ===================== 角度计算核心 =====================
def compute_vector_angle(x: float, y: float) -> tuple[float, float]:
@@ -440,23 +533,29 @@ def compute_vector_angle(x: float, y: float) -> tuple[float, float]:
return angle, mag
def compute_PZT_angle(Px: float, Py: float) -> tuple[float, float]:
return compute_vector_angle(Px, -Py)
return compute_vector_angle(Px, Py)
# ===================== 核心入口函数 =====================
def get_pzt_angle(adc_data):
if len(adc_data) != 84:
raise ValueError("ADC数据长度必须为84")
baseline_subtracted = subtract_baseline(adc_data)
dx, dy = compute_pressure_direction(baseline_subtracted)
result = compute_pressure_direction(baseline_subtracted)
dx, dy = result[6], result[7]
magnitude = result[10]
state = int(result[11])
pzt_angle, _ = compute_PZT_angle(dx, dy)
return pzt_angle, magnitude, state
return pzt_angle
# ===================== 重置基线(校准用) =====================
def reset_baseline():
global first_frame
global first_frame, dynamic_thresh
with first_frame_lock:
first_frame = None
dynamic_thresh = None
noise_sum_buf.clear()
reset_cop_state()