From 60f9ad15e7f63bd3f71419f0c4094b8c4e16a8bc Mon Sep 17 00:00:00 2001 From: lenn Date: Tue, 5 May 2026 16:48:16 +0800 Subject: [PATCH] Build SDK core streaming skeleton --- docs/PROGRESS.md | 346 +++++++++++++++++++++++++++++++++++++++++++++++ src/channel.rs | 91 +++++++------ src/device.rs | 203 +++++++++++++++++++++++++-- src/register.rs | 81 ++++++++++- src/stream.rs | 296 +++++++++++++++++++++++++++++++++++++++- src/transport.rs | 3 + 6 files changed, 963 insertions(+), 57 deletions(-) create mode 100644 docs/PROGRESS.md diff --git a/docs/PROGRESS.md b/docs/PROGRESS.md new file mode 100644 index 0000000..5be375b --- /dev/null +++ b/docs/PROGRESS.md @@ -0,0 +1,346 @@ +# Eskin Finger SDK Progress + +本文件记录当前代码骨架进度、已完成设计决策、已知问题和后续实现顺序。 + +## 当前状态 + +当前 SDK 已经形成如下分层: + +```text +Device API + -> Stream Runtime / Channel + -> Register Access + -> Protocol Codec + -> Serial Transport + -> Hardware +``` + +当前 `cargo check` 可以通过,但还有 `stream.rs` 中的两个 warning 需要清理。 + +## 已完成 + +### 1. Protocol Layer + +文件:`src/protocol.rs` + +已完成: + +- 读请求编码:`encode_read_request` +- 写请求编码:`encode_write_request` +- 读应答解码:`decode_read_response` +- 写应答解码:`decode_write_response` +- stream frame 解码入口:`decode_stream_frame` +- CRC 校验 +- 设备状态码转换 +- response 帧长度校验 + +当前协议约定: + +```text +response frame = header + payload/status data + status(1B) + crc(1B) +crc 是最后 1 字节 +status 是 crc 前 1 字节 +``` + +注意: + +- `FRAME_START_RESPONSE = 0xAA55` +- 当前 `device.rs` 和 `stream.rs` 读取 response 起始符时使用 `u16::from_be_bytes([header[0], header[1]])` +- 如果真实设备返回字节序为 `55 AA`,这里需要改为小端;如果真实返回 `AA 55`,当前逻辑正确 + +### 2. Transport Layer + +文件:`src/transport.rs` + +已完成: + +- `SerialTransport` trait +- `SerialPortTransport` +- 串口 open/close/is_open +- write/read/flush_rx +- timeout 转换 +- serialport error 到 `SdkError` 的转换 +- `SharedSerialTransport = Arc>>` + +设计决策: + +- `SerialTransport: Send`,不要求 `Sync` +- 跨线程共享通过 `Arc>` 完成 +- 串口 request/response 应在同一把 mutex lock 内完成,避免多线程串帧 + +### 3. Device Layer + +文件:`src/device.rs` + +已完成: + +- `DeviceState` +- `EskinDeviceInner` +- `EskinDevice` trait +- `open/close` +- `start_stream/stop_stream` +- `read_sample/read_event` +- 同步 `read_register/write_register` +- `ensure_open` +- 共享 channel:`Arc` +- 共享 transport:`SharedSerialTransport` +- `create_stream_runtime` +- `shared_transport` + +当前行为: + +- `read_register/write_register` 会: + - 检查设备状态 + - protocol encode request + - lock transport + - flush rx + - write request + - read full response frame + - protocol decode response + +注意: + +- `device.start_stream()` 和 `StreamRuntime::start()` 当前都会发送 `StreamStarted` +- 后续需要明确 stream 状态由谁统一管理,避免重复事件 + +### 4. Channel Layer + +文件:`src/channel.rs` + +已完成: + +- `DeviceCommand` +- `DeviceEvent` +- `ChannelManager` +- sample/cmd/event 三类 channel +- `send_sample/recv_sample` +- `send_cmd/recv_cmd` +- `send_event/recv_event` +- `dropped_count/reset_dropped_count` +- sample drop policy: + - `DropNewest` + - `DropOldest` + +设计决策: + +- `dropped_samples` 只统计 sample drop,不统计 command/event +- channel timeout 和 disconnected 分别映射为: + - `SdkError::Timeout` + - `SdkError::ChannelClosed` +- sample channel 满时根据 drop policy 处理,不作为硬错误 + +### 5. Stream Layer + +文件:`src/stream.rs` + +已完成: + +- `StreamMode` +- `StreamConfig` +- `StreamController` +- `StreamRuntime` +- `StreamWorker` +- worker thread 生命周期: + - `start()` spawn worker + - `stop()` stop flag + join +- `SampleCollector` trait +- `NoopSampleCollector` +- `PollingSampleCollector` 骨架 +- polling collector 已具备同步 `read_register` 能力 +- polling collector 当前会尝试读取: + - `REG_COMBINED_FORCE` + - `REG_MODULE_ERROR` + +当前行为: + +```text +StreamRuntime::start() + -> make_collector() + -> spawn StreamWorker + -> worker loop + -> collector.collect_once() + -> if Some(sample), send_sample(sample) +``` + +当前 `PollingSampleCollector::collect_once()` 只读取 raw bytes,尚未解析为 `FingerSample`,因此返回 `Ok(None)`。 + +已知 warning: + +- `src/stream.rs` unused import: `transport::{self, ...}` 中的 `self` +- `StreamWorker::new` 参数 `transport` 未使用 + +### 6. Register Layer + +文件:`src/register.rs` + +已完成: + +- 寄存器地址常量 +- `RegisterSpec` +- `RegisterAccess` +- `RegisterValueType` +- `DEVICE_INFO_REGISTERS` +- `RegisterMap` trait +- `EskinRegisterMap` +- `parse_distribution_force` + +暂未完成: + +- `distribution_register` +- `parse_device_info` +- combined force 解析 +- module error 解析 + +## 当前主要设计决策 + +### Transport 共享模型 + +当前使用: + +```rust +Arc>> +``` + +原因: + +- device 和 stream worker 需要共享同一个串口 +- 串口读写需要 `&mut self` +- mutex 保证一次 request/response 不被其他线程打断 + +长期建议: + +- stream running 时,尽量由 worker 独占串口访问 +- 主线程通过 command channel 请求 worker 操作设备 +- 避免主线程同步 `read_register` 和 worker polling 同时抢 transport + +### Stream 职责拆分 + +当前拆分: + +```text +StreamRuntime + 管理 start/stop、worker handle、对外读取 sample/event + +StreamWorker + 管理 loop、running flag、sleep、错误事件 + +SampleCollector + 管理一次采集,后续负责协议读写和 sample 构建 +``` + +这是推荐方向。worker 不应该直接塞满协议和寄存器解析逻辑。 + +## 明确下一步 + +### Step 1: 清理当前 warning + +文件:`src/stream.rs` + +处理: + +- 删除 unused import 中的 `self` + +```rust +transport::{SerialTransport, SharedSerialTransport} +``` + +- `StreamWorker::new` 当前参数 `transport` 未使用。二选一: + - 删除 `StreamWorker` 中的 transport 参数,因为 collector 已持有 transport + - 或者让 worker 持有 transport,但不推荐,职责重复 + +推荐:删除 `StreamWorker::new` 的 `transport` 参数。 + +### Step 2: 完善 Register 解析接口 + +文件:`src/register.rs` + +新增解析函数: + +- `parse_combined_forces(raw: &[u8]) -> Result, SdkError>` +- `parse_module_errors(raw: &[u8]) -> Result, SdkError>` + +依据当前寄存器表: + +```text +REG_COMBINED_FORCE = 0x0500 +长度 168B = 28 modules * 6B +每个 module = fx:i16 + fy:i16 + fz:i16 + +REG_MODULE_ERROR = 0x0700 +长度 56B = 28 modules * 2B +每个 module = error_code:u16 +``` + +### Step 3: 让 PollingSampleCollector 产出 FingerSample + +文件:`src/stream.rs` + +在 `PollingSampleCollector::collect_once()` 中: + +```text +1. sequence = next_sequence() +2. read REG_COMBINED_FORCE +3. read REG_MODULE_ERROR +4. register parse raw bytes +5. build FingerSample +6. return Ok(Some(sample)) +``` + +先不处理 distribution force。 + +### Step 4: 补 distribution force + +文件:`src/register.rs`、`src/stream.rs` + +前置条件: + +- `distribution_register(module)` 能根据 `SensorModule` 返回地址和长度 +- 需要确认每个 module 的分布力长度来源 + +实现策略: + +- `StreamConfig.read_distribution == false` 时跳过 +- `StreamConfig.modules` 为空时默认读所有模块,或者默认不读;需要明确语义 + +### Step 5: 统一 stream 状态入口 + +文件:`src/device.rs`、`src/stream.rs` + +当前重复点: + +- `device.start_stream()` 发 `StreamStarted` +- `StreamRuntime::start()` 也发 `StreamStarted` + +需要选择一个主入口: + +推荐: + +```text +device.open() +let mut stream = device.create_stream_runtime() +stream.start(config) +stream.next_sample() +stream.stop() +``` + +如果最终 SDK 希望用户只调用 `device.start_stream()`,则 `EskinDeviceInner` 需要持有 `StreamRuntime` 或 worker handle。 + +### Step 6: 增加基础测试 + +建议先加这些测试: + +- protocol encode read request golden bytes +- protocol encode write request golden bytes +- CRC 校验 +- register parse combined force +- register parse module error +- ChannelManager drop policy + +## 当前风险点 + +1. response 起始符字节序仍需真实设备帧确认。 +2. stream worker 和 device 同步 read/write 共享同一 transport,虽然 mutex 安全,但业务上仍可能抢响应。 +3. `PollingSampleCollector` 已读取 raw bytes,但还未构建 sample。 +4. `register.rs` 的 `parse_device_info` 和 `distribution_register` 仍是 `todo!()`。 +5. `StreamStarted/StreamStopped` 事件存在重复来源,需要统一入口。 + diff --git a/src/channel.rs b/src/channel.rs index 6531c56..85274bb 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -3,7 +3,7 @@ use crate::{ error::SdkError, types::{FingerSample, SensorModule}, }; -use crossbeam_channel::{Receiver, Sender, bounded}; +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TrySendError, bounded}; use std::sync::atomic::{AtomicU64, Ordering}; #[derive(Debug, Clone)] @@ -71,23 +71,33 @@ impl ChannelManager { } } - pub fn send_sample(&self, sample: FingerSample) { + pub fn send_sample(&self, sample: FingerSample) -> Result<(), SdkError> { match self.drop_policy { - DropPolicy::DropNewest => { - if self.sample_tx.try_send(sample).is_err() { - self.dropped_samples.fetch_add(1, Ordering::Relaxed); + DropPolicy::DropNewest => match self.sample_tx.try_send(sample) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + self.record_sample_drop(); + Ok(()) } - } - DropPolicy::DropOldest => { - if let Err(crossbeam_channel::TrySendError::Full(captured)) = - self.sample_tx.try_send(sample) - { + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), + }, + DropPolicy::DropOldest => match self.sample_tx.try_send(sample) { + Ok(()) => Ok(()), + Err(TrySendError::Full(sample)) => { let _ = self.sample_rx.try_recv(); - if self.sample_tx.try_send(captured).is_err() { - self.dropped_samples.fetch_add(1, Ordering::Relaxed); + self.record_sample_drop(); + + match self.sample_tx.try_send(sample) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + self.record_sample_drop(); + Ok(()) + } + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), } } - } + Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed), + }, } } @@ -95,50 +105,53 @@ impl ChannelManager { let timeout = std::time::Duration::from_millis(timeout_ms as u64); self.sample_rx .recv_timeout(timeout) - .map_err(|_| SdkError::Timeout) + .map_err(|err| match err { + RecvTimeoutError::Timeout => SdkError::Timeout, + RecvTimeoutError::Disconnected => SdkError::ChannelClosed, + }) } - pub fn send_cmd(&self, cmd: DeviceCommand) { - match self.drop_policy { - DropPolicy::DropNewest => { - if self.cmd_tx.try_send(cmd).is_err() { - self.dropped_samples.fetch_add(1, Ordering::Relaxed); - } - } - DropPolicy::DropOldest => { - if let Err(crossbeam_channel::TrySendError::Full(captured)) = - self.cmd_tx.try_send(cmd) - { - let _ = self.cmd_rx.try_recv(); - if self.cmd_tx.try_send(captured).is_err() { - self.dropped_samples.fetch_add(1, Ordering::Relaxed); - } - } - } - } + pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> { + self.cmd_tx.try_send(cmd).map_err(|err| match err { + TrySendError::Full(_) => SdkError::BufferOverflow(1), + TrySendError::Disconnected(_) => SdkError::ChannelClosed, + }) } pub fn recv_cmd(&self, timeout_ms: u32) -> Result { let timeout = std::time::Duration::from_millis(timeout_ms as u64); - self.cmd_rx - .recv_timeout(timeout) - .map_err(|_| SdkError::Timeout) + self.cmd_rx.recv_timeout(timeout).map_err(|err| match err { + RecvTimeoutError::Timeout => SdkError::Timeout, + RecvTimeoutError::Disconnected => SdkError::ChannelClosed, + }) } pub fn dropped_count(&self) -> u64 { self.dropped_samples.load(Ordering::Relaxed) } + pub fn reset_dropped_count(&self) { + self.dropped_samples.store(0, Ordering::Relaxed); + } + + fn record_sample_drop(&self) -> u64 { + self.dropped_samples.fetch_add(1, Ordering::Relaxed) + 1 + } + pub fn send_event(&self, event: DeviceEvent) -> Result<(), SdkError> { - self.event_tx - .try_send(event) - .map_err(|_| SdkError::ChannelClosed) + self.event_tx.try_send(event).map_err(|err| match err { + TrySendError::Full(_) => SdkError::BufferOverflow(1), + TrySendError::Disconnected(_) => SdkError::ChannelClosed, + }) } pub fn recv_event(&self, timeout_ms: u32) -> Result { let timeout = std::time::Duration::from_millis(timeout_ms as u64); self.event_rx .recv_timeout(timeout) - .map_err(|_| SdkError::Timeout) + .map_err(|err| match err { + RecvTimeoutError::Timeout => SdkError::Timeout, + RecvTimeoutError::Disconnected => SdkError::ChannelClosed, + }) } } diff --git a/src/device.rs b/src/device.rs index 353fcd5..9b43bce 100644 --- a/src/device.rs +++ b/src/device.rs @@ -1,13 +1,16 @@ -use std::time::Duration; - use chrono::Duration; +use std::sync::{Arc, Mutex, MutexGuard}; use crate::{ channel::{ChannelManager, DeviceEvent}, config::{DeviceConfig, DeviceInfo}, error::SdkError, - protocol::{EskinProtocolCodec, ProtocolCodec}, - transport::SerialTransport, + protocol::{ + EskinProtocolCodec, FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ProtocolCodec, + ReadRequest, WriteRequest, + }, + stream::StreamRuntime, + transport::{SerialTransport, SharedSerialTransport}, types::FingerSample, }; @@ -22,9 +25,9 @@ pub enum DeviceState { pub struct EskinDeviceInner { pub info: DeviceInfo, pub config: DeviceConfig, - pub channels: ChannelManager, + pub channels: Arc, pub state: DeviceState, - pub transport: Box, + pub transport: SharedSerialTransport, pub codec: Box, } @@ -40,7 +43,25 @@ impl EskinDeviceInner { Self { info: DeviceInfo::default(), config, - channels, + channels: Arc::new(channels), + state: DeviceState::Closed, + transport: Arc::new(Mutex::new(transport)), + codec: Box::new(EskinProtocolCodec), + } + } + + pub fn new_shared(config: DeviceConfig, transport: SharedSerialTransport) -> Self { + let channels = ChannelManager::new( + config.sample_capacity, + config.command_capacity, + config.event_capacity, + config.drop_policy, + ); + + Self { + info: DeviceInfo::default(), + config, + channels: Arc::new(channels), state: DeviceState::Closed, transport, codec: Box::new(EskinProtocolCodec), @@ -48,13 +69,13 @@ impl EskinDeviceInner { } fn read_exact_from_transport( - &mut self, + transport: &mut dyn SerialTransport, buf: &mut [u8], timeout: Duration, ) -> Result<(), SdkError> { let mut offset = 0; while offset < buf.len() { - let n = self.transport.read(&mut buf[offset..], timeout)?; + let n = transport.read(&mut buf[offset..], timeout)?; if n == 0 { return Err(SdkError::Timeout); @@ -66,10 +87,54 @@ impl EskinDeviceInner { Ok(()) } - fn read_response_frame(&mut self) -> Result, SdkError> { - let timeout = Duration::from_millis(self.config.read_timeout_ms as u64); + fn read_response_frame_from( + &self, + transport: &mut dyn SerialTransport, + ) -> Result, SdkError> { + let timeout = Duration::milliseconds(self.config.read_timeout_ms as i64); let mut header = [0u8; 4]; - self.read_exact_from_transport(&mut header, timeout)?; + Self::read_exact_from_transport(transport, &mut header, timeout)?; + + let start = u16::from_be_bytes([header[0], header[1]]); + if start != FRAME_START_RESPONSE { + return Err(SdkError::FrameError(format!( + "invalid response start: 0x{start:04X}" + ))); + } + let data_len = u16::from_le_bytes([header[2], header[3]]) as usize; + let total_len = 4 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN; + + let mut frame = vec![0u8; total_len]; + frame[..4].copy_from_slice(&header); + Self::read_exact_from_transport(transport, &mut frame[4..], timeout)?; + + Ok(frame) + } + + fn lock_transport(&self) -> Result>, SdkError> { + self.transport + .lock() + .map_err(|_| SdkError::InternalError("transport mutex poisoned".into())) + } + + fn ensure_open(&self) -> Result<(), SdkError> { + match self.state { + DeviceState::Open | DeviceState::Streaming => Ok(()), + DeviceState::Closed => Err(SdkError::NotInitialized), + DeviceState::Error => Err(SdkError::InternalError("device is in error state".into())), + } + } + + pub fn channels(&self) -> Arc { + Arc::clone(&self.channels) + } + + pub fn create_stream_runtime(&self) -> StreamRuntime { + StreamRuntime::new(Arc::clone(&self.channels), Arc::clone(&self.transport)) + } + + pub fn shared_transport(&self) -> SharedSerialTransport { + Arc::clone(&self.transport) } } @@ -87,3 +152,117 @@ pub trait EskinDevice { fn read_register(&mut self, addr: u32, length: u16) -> Result, SdkError>; fn write_register(&mut self, addr: u32, data: &[u8]) -> Result; } + +impl EskinDevice for EskinDeviceInner { + fn open(&mut self) -> Result<(), SdkError> { + { + let mut transport = self.lock_transport()?; + transport.open()?; + transport.flush_rx()?; + } + self.state = DeviceState::Open; + Ok(()) + } + + fn close(&mut self) -> Result<(), SdkError> { + { + let mut transport = self.lock_transport()?; + transport.close()?; + } + self.state = DeviceState::Closed; + Ok(()) + } + + fn state(&self) -> DeviceState { + self.state + } + + fn device_info(&self) -> Result { + Ok(self.info.clone()) + } + + fn config(&self) -> &DeviceConfig { + &self.config + } + + fn apply_config(&mut self, config: DeviceConfig) -> Result<(), SdkError> { + self.config = config; + Ok(()) + } + + fn start_stream(&mut self) -> Result<(), SdkError> { + if self.state == DeviceState::Streaming { + return Err(SdkError::AlreadyStreaming); + } + + if self.state != DeviceState::Open { + return Err(SdkError::NotInitialized); + } + + self.state = DeviceState::Streaming; + self.channels.send_event(DeviceEvent::StreamStarted)?; + Ok(()) + } + + fn stop_stream(&mut self) -> Result<(), SdkError> { + if self.state != DeviceState::Streaming { + return Err(SdkError::NotStreaming); + } + + self.state = DeviceState::Open; + self.channels.send_event(DeviceEvent::StreamStopped)?; + Ok(()) + } + + fn read_sample(&self, timeout_ms: u32) -> Result { + self.channels.recv_sample(timeout_ms) + } + + fn read_event(&self, timeout_ms: u32) -> Result { + self.channels.recv_event(timeout_ms) + } + + fn read_register(&mut self, addr: u32, length: u16) -> Result, SdkError> { + self.ensure_open()?; + + let request = ReadRequest { + device_addr: self.config.device_addr, + start_addr: addr, + read_byte_count: length, + }; + + let request_frame = self.codec.encode_read_request(&request)?; + + let response_frame = { + let mut transport = self.lock_transport()?; + transport.flush_rx()?; + transport.write(&request_frame)?; + self.read_response_frame_from(transport.as_mut())? + }; + let response = self.codec.decode_read_response(&response_frame)?; + + Ok(response.data) + } + + fn write_register(&mut self, addr: u32, data: &[u8]) -> Result { + self.ensure_open()?; + + let request = WriteRequest { + device_addr: self.config.device_addr, + start_addr: addr, + data: data.to_vec(), + }; + + let request_frame = self.codec.encode_write_request(&request)?; + + let response_frame = { + let mut transport = self.lock_transport()?; + transport.flush_rx()?; + transport.write(&request_frame)?; + self.read_response_frame_from(transport.as_mut())? + }; + let response = self.codec.decode_write_response(&response_frame)?; + + Ok(response.return_byte_count) + } +} diff --git a/src/register.rs b/src/register.rs index 5dc04cf..a9569b9 100644 --- a/src/register.rs +++ b/src/register.rs @@ -1,7 +1,7 @@ use crate::{ config::DeviceInfo, error::SdkError, - types::{DistributionForce, SensorModule}, + types::{DistributionForce, ForcePoint, SensorModule}, }; pub const REG_SERIAL_NUMBER: u32 = 0x0000; @@ -39,6 +39,57 @@ pub struct RegisterSpec { pub value_type: RegisterValueType, } +pub const DEVICE_INFO_REGISTERS: &[RegisterSpec] = &[ + RegisterSpec { + addr: REG_SERIAL_NUMBER, + len: 4, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U32, + }, + RegisterSpec { + addr: REG_FIRMWARE_VERSION, + len: 2, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U16, + }, + RegisterSpec { + addr: REG_CALIBRATION_GROUP, + len: 2, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U16, + }, + RegisterSpec { + addr: REG_MODULE_ACTIVE_STATUS, + len: 2, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U16, + }, + RegisterSpec { + addr: REG_L_LINE, + len: 2, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U16, + }, + RegisterSpec { + addr: REG_H_LINE, + len: 2, + access: RegisterAccess::ReadOnly, + value_type: RegisterValueType::U16, + }, + RegisterSpec { + addr: REG_PRODUCT_CONFIG_1, + len: 4, + access: RegisterAccess::ReadWrite, + value_type: RegisterValueType::U32, + }, + RegisterSpec { + addr: REG_PRODUCT_CONFIG_2, + len: 4, + access: RegisterAccess::ReadWrite, + value_type: RegisterValueType::U32, + }, +]; + pub trait RegisterMap { fn device_info_registers(&self) -> &'static [RegisterSpec]; fn distribution_register(&self, module: SensorModule) -> Result; @@ -55,7 +106,7 @@ pub struct EskinRegisterMap; impl RegisterMap for EskinRegisterMap { fn device_info_registers(&self) -> &'static [RegisterSpec] { - todo!("device info register specs") + DEVICE_INFO_REGISTERS } fn distribution_register(&self, _module: SensorModule) -> Result { @@ -68,9 +119,29 @@ impl RegisterMap for EskinRegisterMap { fn parse_distribution_force( &self, - _module: SensorModule, - _raw: &[u8], + module: SensorModule, + raw: &[u8], ) -> Result { - todo!("parse distribution force") + if raw.len() % 3 != 0 { + return Err(SdkError::FrameError(format!( + "distribution force length must be multiple of 3, got {}", + raw.len() + ))); + } + + let points = raw + .chunks_exact(3) + .map(|chunk| ForcePoint { + fx: chunk[0] as i8, + fy: chunk[1] as i8, + fz: chunk[2] as i8, + }) + .collect::>(); + + Ok(DistributionForce { + module, + point_count: points.len() as u16, + points, + }) } } diff --git a/src/stream.rs b/src/stream.rs index 614ceb3..f4a4e94 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,9 +1,22 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use crate::register::{REG_COMBINED_FORCE, REG_MODULE_ERROR}; +use chrono::Duration; + use crate::{ - channel::DeviceEvent, + channel::{ChannelManager, DeviceEvent}, error::SdkError, + protocol::{EskinProtocolCodec, ProtocolCodec}, + transport::{self, SerialTransport, SharedSerialTransport}, types::{FingerSample, SensorModule}, }; +use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ReadRequest}; +use std::thread::{self, JoinHandle}; +use std::time::Duration as StdDuration; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum StreamMode { Polling, @@ -16,6 +29,8 @@ pub struct StreamConfig { pub read_distribution: bool, pub modules: Vec, pub poll_interval_ms: u32, + pub device_addr: u8, + pub read_timeout_ms: u32, } impl Default for StreamConfig { @@ -25,6 +40,8 @@ impl Default for StreamConfig { read_distribution: true, modules: Vec::new(), poll_interval_ms: 10, + device_addr: 0x34, + read_timeout_ms: 100, } } } @@ -36,3 +53,280 @@ pub trait StreamController: Send { fn next_sample(&self, timeout_ms: u32) -> Result; fn next_event(&self, timeout_ms: u32) -> Result; } + +pub struct StreamRuntime { + running: Arc, + config: Option, + channels: Arc, + transport: SharedSerialTransport, + worker: Option>, +} + +impl StreamRuntime { + pub fn new(channels: Arc, transport: SharedSerialTransport) -> Self { + Self { + running: Arc::new(AtomicBool::new(false)), + config: None, + channels, + transport, + worker: None, + } + } + + pub fn running_flag(&self) -> Arc { + Arc::clone(&self.running) + } + + pub fn config(&self) -> Option<&StreamConfig> { + self.config.as_ref() + } + + pub fn transport(&self) -> SharedSerialTransport { + Arc::clone(&self.transport) + } +} + +impl StreamController for StreamRuntime { + fn start(&mut self, config: StreamConfig) -> Result<(), SdkError> { + if self.running.swap(true, Ordering::SeqCst) { + return Err(SdkError::AlreadyStreaming); + } + + let collector = make_collector(&config, Arc::clone(&self.transport)); + let worker = StreamWorker::new( + Arc::clone(&self.running), + Arc::clone(&self.channels), + Arc::clone(&self.transport), + config.clone(), + collector, + ) + .spawn(); + + self.worker = Some(worker); + self.config = Some(config); + self.channels.send_event(DeviceEvent::StreamStarted)?; + + Ok(()) + } + + fn stop(&mut self) -> Result<(), SdkError> { + if !self.running.swap(false, Ordering::SeqCst) { + return Err(SdkError::NotStreaming); + } + + self.channels.send_event(DeviceEvent::StreamStopped)?; + + if let Some(worker) = self.worker.take() { + worker + .join() + .map_err(|_| SdkError::InternalError("stream worker panicked".into()))?; + } + Ok(()) + } + + fn is_running(&self) -> bool { + self.running.load(Ordering::SeqCst) + } + + fn next_sample(&self, timeout_ms: u32) -> Result { + self.channels.recv_sample(timeout_ms) + } + + fn next_event(&self, timeout_ms: u32) -> Result { + self.channels.recv_event(timeout_ms) + } +} + +pub struct StreamWorker { + running: Arc, + channels: Arc, + config: StreamConfig, + collector: Box, +} + +impl StreamWorker { + pub fn new( + running: Arc, + channels: Arc, + transport: SharedSerialTransport, + config: StreamConfig, + collector: Box, + ) -> Self { + Self { + running, + channels, + config, + collector, + } + } + + pub fn spawn(self) -> JoinHandle<()> { + thread::spawn(move || self.run()) + } + + fn run(mut self) { + while self.running.load(Ordering::SeqCst) { + if let Err(err) = self.tick() { + let _ = self + .channels + .send_event(DeviceEvent::IoError(err.to_string())); + + self.running.store(false, Ordering::SeqCst); + break; + } + + thread::sleep(StdDuration::from_millis( + self.config.poll_interval_ms as u64, + )); + } + } + + fn tick(&mut self) -> Result<(), SdkError> { + // let _transport = self + // .transport + // .lock() + // .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; + + let Some(sample) = self.collector.collect_once()? else { + return Ok(()); + }; + self.channels.send_sample(sample)?; + Ok(()) + // TODO: + // 1. encode read request + // 2. transport.write() + // 3. transport.read() + // 4. protocol.decode() + // 5. register parse + // 6. channels.send_sample() + } +} + +pub trait SampleCollector: Send { + fn collect_once(&mut self) -> Result, SdkError>; +} + +pub struct NoopSampleCollector; + +impl SampleCollector for NoopSampleCollector { + fn collect_once(&mut self) -> Result, SdkError> { + Ok(None) + } +} + +pub struct PollingSampleCollector { + transport: SharedSerialTransport, + codec: Box, + config: StreamConfig, + sequence: u32, +} + +impl PollingSampleCollector { + pub fn new(transport: SharedSerialTransport, config: StreamConfig) -> Self { + Self { + transport, + codec: Box::new(EskinProtocolCodec), + config, + sequence: 0, + } + } + fn next_sequence(&mut self) -> u32 { + let sequence = self.sequence; + self.sequence = self.sequence.wrapping_add(1); + sequence + } + fn read_exact_from_transport( + transport: &mut dyn SerialTransport, + buf: &mut [u8], + timeout: Duration, + ) -> Result<(), SdkError> { + let mut offset = 0; + while offset < buf.len() { + let n = transport.read(&mut buf[offset..], timeout)?; + + if n == 0 { + return Err(SdkError::Timeout); + } + + offset += n; + } + + Ok(()) + } + fn read_response_frame( + &self, + transport: &mut dyn SerialTransport, + ) -> Result, SdkError> { + let timeout = Duration::milliseconds(self.config.read_timeout_ms as i64); + + let mut header = [0u8; 4]; + Self::read_exact_from_transport(transport, &mut header, timeout)?; + + let start = u16::from_be_bytes([header[0], header[1]]); + if start != FRAME_START_RESPONSE { + return Err(SdkError::FrameError(format!( + "invalid response start: 0x{start:04X}" + ))); + } + + let data_len = u16::from_le_bytes([header[2], header[3]]) as usize; + let total_len = 4 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN; + + let mut frame = vec![0u8; total_len]; + frame[..4].copy_from_slice(&header); + + Self::read_exact_from_transport(transport, &mut frame[4..], timeout)?; + + Ok(frame) + } + + fn read_register(&mut self, addr: u32, length: u16) -> Result, SdkError> { + let request = ReadRequest { + device_addr: self.config.device_addr, + start_addr: addr, + read_byte_count: length, + }; + + let request_frame = self.codec.encode_read_request(&request)?; + + let response_frame = { + let mut transport = self + .transport + .lock() + .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?; + + transport.flush_rx()?; + transport.write(&request_frame)?; + self.read_response_frame(transport.as_mut())? + }; + + let response = self.codec.decode_read_response(&response_frame)?; + Ok(response.data) + } +} + +impl SampleCollector for PollingSampleCollector { + fn collect_once(&mut self) -> Result, SdkError> { + let _sequence = self.next_sequence(); + + let _combined_force_raw = self.read_register(REG_COMBINED_FORCE, 168)?; + let _module_error_raw = self.read_register(REG_MODULE_ERROR, 56)?; + + // TODO: + // parse combined force + // parse module error + // build FingerSample + + Ok(None) + } +} + +fn make_collector( + config: &StreamConfig, + transport: SharedSerialTransport, +) -> Box { + match config.mode { + StreamMode::Polling => Box::new(PollingSampleCollector::new(transport, config.clone())), + StreamMode::AutoDistribution => Box::new(NoopSampleCollector), + } +} diff --git a/src/transport.rs b/src/transport.rs index 0c6edc1..cc4d95e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -2,6 +2,9 @@ use crate::error::SdkError; use chrono::Duration; use serialport::{ClearBuffer, DataBits, FlowControl, Parity, StopBits}; use std::io::ErrorKind; +use std::sync::{Arc, Mutex}; + +pub type SharedSerialTransport = Arc>>; pub trait SerialTransport: Send { fn open(&mut self) -> Result<(), SdkError>;