127 lines
3.9 KiB
Python
127 lines
3.9 KiB
Python
"""
|
||
独立测试脚本:读取84个原始ADC数据,传入CoP算法计算角度,终端打印结果。
|
||
|
||
用法:
|
||
python test_pzt.py # 从 stdin 逐行读取(每行84个逗号分隔数值)
|
||
python test_pzt.py data.csv # 从 CSV 文件逐行读取
|
||
python test_pzt.py --random # 生成随机测试数据(调试用)
|
||
"""
|
||
|
||
import sys
|
||
import csv
|
||
import numpy as np
|
||
|
||
# ── 从 sensor_server.py 导入算法 ──
|
||
sys.path.insert(0, ".")
|
||
from sensor_server import (
|
||
get_pzt_angle,
|
||
reset_baseline,
|
||
subtract_baseline,
|
||
compute_pressure_direction,
|
||
compute_PZT_angle,
|
||
)
|
||
|
||
|
||
def print_result(data_label: str, pzt_angle: float, magnitude: float, state: int, cop_x: float, cop_y: float, base_x: float, base_y: float):
|
||
dx = cop_x - base_x
|
||
dy = base_y - cop_y
|
||
print(
|
||
f"devkit: angle={pzt_angle:.2f}, magnitude={magnitude:.4f}, state={state}, "
|
||
f"cop_x={cop_x:.4f}, cop_y={cop_y:.4f}, dx={dx:.4f}, dy={dy:.4f}"
|
||
)
|
||
|
||
|
||
def process_values(values: list[int | float]):
|
||
"""处理一帧84个值并打印结果"""
|
||
if len(values) != 84:
|
||
print(f"[ERROR] 期望84个值,实际收到 {len(values)} 个", file=sys.stderr)
|
||
return
|
||
|
||
try:
|
||
pzt_angle, magnitude, state, cop_x, cop_y, base_x, base_y = get_pzt_angle(values)
|
||
print_result("", pzt_angle, magnitude, state, cop_x, cop_y, base_x, base_y)
|
||
except Exception as e:
|
||
print(f"[ERROR] 计算失败: {e}", file=sys.stderr)
|
||
|
||
|
||
def run_random_test():
|
||
"""生成随机数据测试算法"""
|
||
reset_baseline()
|
||
print("[TEST] 使用随机数据测试 CoP 算法")
|
||
print("[TEST] 先用全零帧建立基线...")
|
||
process_values([0] * 84)
|
||
print("[TEST] 模拟右侧偏移按压...")
|
||
# 模拟:row 5-7, col 4-6 区域有压力
|
||
data = [0.0] * 84
|
||
for r in range(5, 8):
|
||
for c in range(4, 7):
|
||
idx = r * 7 + c
|
||
data[idx] = 100.0 + (c - 4) * 50 # 右侧更强
|
||
process_values(data)
|
||
print("[TEST] 模拟下方偏移按压...")
|
||
data2 = [0.0] * 84
|
||
for r in range(8, 11):
|
||
for c in range(2, 5):
|
||
idx = r * 7 + c
|
||
data2[idx] = 150.0 + (r - 8) * 30
|
||
process_values(data2)
|
||
print("[TEST] 完成")
|
||
|
||
|
||
def run_csv_mode(filepath: str):
|
||
"""从 CSV 文件逐行读取并处理"""
|
||
reset_baseline()
|
||
print(f"[CSV] 读取文件: {filepath}")
|
||
with open(filepath, "r", encoding="utf-8-sig", newline="") as f:
|
||
reader = csv.reader(f)
|
||
for i, row in enumerate(reader):
|
||
if not row:
|
||
continue
|
||
# 跳过 header
|
||
if row[0].strip() in ("seq", "timestamp_ms"):
|
||
print(f"[CSV] 跳过 header: {row[:5]}...")
|
||
continue
|
||
try:
|
||
values = [float(v) for v in row]
|
||
if len(values) == 84:
|
||
process_values(values)
|
||
elif len(values) > 84:
|
||
process_values(values[:84])
|
||
except ValueError:
|
||
continue
|
||
|
||
|
||
def run_stdin_mode():
|
||
"""从 stdin 逐行读取"""
|
||
reset_baseline()
|
||
print("[STDIN] 等待输入(每行84个逗号分隔数值,Ctrl+C 退出)...")
|
||
try:
|
||
for line in sys.stdin:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
values = [float(v) for v in line.split(",")]
|
||
if len(values) >= 84:
|
||
process_values(values[:84])
|
||
except ValueError:
|
||
continue
|
||
except KeyboardInterrupt:
|
||
print("\n[STDIN] 已退出")
|
||
|
||
|
||
def main():
|
||
if len(sys.argv) > 1:
|
||
arg = sys.argv[1]
|
||
if arg == "--random":
|
||
run_random_test()
|
||
elif arg == "--help" or arg == "-h":
|
||
print(__doc__)
|
||
else:
|
||
run_csv_mode(arg)
|
||
else:
|
||
run_stdin_mode()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |