Build SDK core streaming skeleton

This commit is contained in:
lenn
2026-05-05 16:48:16 +08:00
parent 79f4055959
commit 60f9ad15e7
6 changed files with 963 additions and 57 deletions

View File

@@ -3,7 +3,7 @@ use crate::{
error::SdkError,
types::{FingerSample, SensorModule},
};
use crossbeam_channel::{Receiver, Sender, bounded};
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender, TrySendError, bounded};
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone)]
@@ -71,23 +71,33 @@ impl ChannelManager {
}
}
pub fn send_sample(&self, sample: FingerSample) {
pub fn send_sample(&self, sample: FingerSample) -> Result<(), SdkError> {
match self.drop_policy {
DropPolicy::DropNewest => {
if self.sample_tx.try_send(sample).is_err() {
self.dropped_samples.fetch_add(1, Ordering::Relaxed);
DropPolicy::DropNewest => match self.sample_tx.try_send(sample) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
}
DropPolicy::DropOldest => {
if let Err(crossbeam_channel::TrySendError::Full(captured)) =
self.sample_tx.try_send(sample)
{
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
DropPolicy::DropOldest => match self.sample_tx.try_send(sample) {
Ok(()) => Ok(()),
Err(TrySendError::Full(sample)) => {
let _ = self.sample_rx.try_recv();
if self.sample_tx.try_send(captured).is_err() {
self.dropped_samples.fetch_add(1, Ordering::Relaxed);
self.record_sample_drop();
match self.sample_tx.try_send(sample) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
self.record_sample_drop();
Ok(())
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
}
}
}
Err(TrySendError::Disconnected(_)) => Err(SdkError::ChannelClosed),
},
}
}
@@ -95,50 +105,53 @@ impl ChannelManager {
let timeout = std::time::Duration::from_millis(timeout_ms as u64);
self.sample_rx
.recv_timeout(timeout)
.map_err(|_| SdkError::Timeout)
.map_err(|err| match err {
RecvTimeoutError::Timeout => SdkError::Timeout,
RecvTimeoutError::Disconnected => SdkError::ChannelClosed,
})
}
pub fn send_cmd(&self, cmd: DeviceCommand) {
match self.drop_policy {
DropPolicy::DropNewest => {
if self.cmd_tx.try_send(cmd).is_err() {
self.dropped_samples.fetch_add(1, Ordering::Relaxed);
}
}
DropPolicy::DropOldest => {
if let Err(crossbeam_channel::TrySendError::Full(captured)) =
self.cmd_tx.try_send(cmd)
{
let _ = self.cmd_rx.try_recv();
if self.cmd_tx.try_send(captured).is_err() {
self.dropped_samples.fetch_add(1, Ordering::Relaxed);
}
}
}
}
pub fn send_cmd(&self, cmd: DeviceCommand) -> Result<(), SdkError> {
self.cmd_tx.try_send(cmd).map_err(|err| match err {
TrySendError::Full(_) => SdkError::BufferOverflow(1),
TrySendError::Disconnected(_) => SdkError::ChannelClosed,
})
}
pub fn recv_cmd(&self, timeout_ms: u32) -> Result<DeviceCommand, SdkError> {
let timeout = std::time::Duration::from_millis(timeout_ms as u64);
self.cmd_rx
.recv_timeout(timeout)
.map_err(|_| SdkError::Timeout)
self.cmd_rx.recv_timeout(timeout).map_err(|err| match err {
RecvTimeoutError::Timeout => SdkError::Timeout,
RecvTimeoutError::Disconnected => SdkError::ChannelClosed,
})
}
pub fn dropped_count(&self) -> u64 {
self.dropped_samples.load(Ordering::Relaxed)
}
pub fn reset_dropped_count(&self) {
self.dropped_samples.store(0, Ordering::Relaxed);
}
fn record_sample_drop(&self) -> u64 {
self.dropped_samples.fetch_add(1, Ordering::Relaxed) + 1
}
pub fn send_event(&self, event: DeviceEvent) -> Result<(), SdkError> {
self.event_tx
.try_send(event)
.map_err(|_| SdkError::ChannelClosed)
self.event_tx.try_send(event).map_err(|err| match err {
TrySendError::Full(_) => SdkError::BufferOverflow(1),
TrySendError::Disconnected(_) => SdkError::ChannelClosed,
})
}
pub fn recv_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
let timeout = std::time::Duration::from_millis(timeout_ms as u64);
self.event_rx
.recv_timeout(timeout)
.map_err(|_| SdkError::Timeout)
.map_err(|err| match err {
RecvTimeoutError::Timeout => SdkError::Timeout,
RecvTimeoutError::Disconnected => SdkError::ChannelClosed,
})
}
}

View File

@@ -1,13 +1,16 @@
use std::time::Duration;
use chrono::Duration;
use std::sync::{Arc, Mutex, MutexGuard};
use crate::{
channel::{ChannelManager, DeviceEvent},
config::{DeviceConfig, DeviceInfo},
error::SdkError,
protocol::{EskinProtocolCodec, ProtocolCodec},
transport::SerialTransport,
protocol::{
EskinProtocolCodec, FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ProtocolCodec,
ReadRequest, WriteRequest,
},
stream::StreamRuntime,
transport::{SerialTransport, SharedSerialTransport},
types::FingerSample,
};
@@ -22,9 +25,9 @@ pub enum DeviceState {
pub struct EskinDeviceInner {
pub info: DeviceInfo,
pub config: DeviceConfig,
pub channels: ChannelManager,
pub channels: Arc<ChannelManager>,
pub state: DeviceState,
pub transport: Box<dyn SerialTransport>,
pub transport: SharedSerialTransport,
pub codec: Box<dyn ProtocolCodec>,
}
@@ -40,7 +43,25 @@ impl EskinDeviceInner {
Self {
info: DeviceInfo::default(),
config,
channels,
channels: Arc::new(channels),
state: DeviceState::Closed,
transport: Arc::new(Mutex::new(transport)),
codec: Box::new(EskinProtocolCodec),
}
}
pub fn new_shared(config: DeviceConfig, transport: SharedSerialTransport) -> Self {
let channels = ChannelManager::new(
config.sample_capacity,
config.command_capacity,
config.event_capacity,
config.drop_policy,
);
Self {
info: DeviceInfo::default(),
config,
channels: Arc::new(channels),
state: DeviceState::Closed,
transport,
codec: Box::new(EskinProtocolCodec),
@@ -48,13 +69,13 @@ impl EskinDeviceInner {
}
fn read_exact_from_transport(
&mut self,
transport: &mut dyn SerialTransport,
buf: &mut [u8],
timeout: Duration,
) -> Result<(), SdkError> {
let mut offset = 0;
while offset < buf.len() {
let n = self.transport.read(&mut buf[offset..], timeout)?;
let n = transport.read(&mut buf[offset..], timeout)?;
if n == 0 {
return Err(SdkError::Timeout);
@@ -66,10 +87,54 @@ impl EskinDeviceInner {
Ok(())
}
fn read_response_frame(&mut self) -> Result<Vec<u8>, SdkError> {
let timeout = Duration::from_millis(self.config.read_timeout_ms as u64);
fn read_response_frame_from(
&self,
transport: &mut dyn SerialTransport,
) -> Result<Vec<u8>, SdkError> {
let timeout = Duration::milliseconds(self.config.read_timeout_ms as i64);
let mut header = [0u8; 4];
self.read_exact_from_transport(&mut header, timeout)?;
Self::read_exact_from_transport(transport, &mut header, timeout)?;
let start = u16::from_be_bytes([header[0], header[1]]);
if start != FRAME_START_RESPONSE {
return Err(SdkError::FrameError(format!(
"invalid response start: 0x{start:04X}"
)));
}
let data_len = u16::from_le_bytes([header[2], header[3]]) as usize;
let total_len = 4 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN;
let mut frame = vec![0u8; total_len];
frame[..4].copy_from_slice(&header);
Self::read_exact_from_transport(transport, &mut frame[4..], timeout)?;
Ok(frame)
}
fn lock_transport(&self) -> Result<MutexGuard<'_, Box<dyn SerialTransport>>, SdkError> {
self.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))
}
fn ensure_open(&self) -> Result<(), SdkError> {
match self.state {
DeviceState::Open | DeviceState::Streaming => Ok(()),
DeviceState::Closed => Err(SdkError::NotInitialized),
DeviceState::Error => Err(SdkError::InternalError("device is in error state".into())),
}
}
pub fn channels(&self) -> Arc<ChannelManager> {
Arc::clone(&self.channels)
}
pub fn create_stream_runtime(&self) -> StreamRuntime {
StreamRuntime::new(Arc::clone(&self.channels), Arc::clone(&self.transport))
}
pub fn shared_transport(&self) -> SharedSerialTransport {
Arc::clone(&self.transport)
}
}
@@ -87,3 +152,117 @@ pub trait EskinDevice {
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError>;
fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError>;
}
impl EskinDevice for EskinDeviceInner {
fn open(&mut self) -> Result<(), SdkError> {
{
let mut transport = self.lock_transport()?;
transport.open()?;
transport.flush_rx()?;
}
self.state = DeviceState::Open;
Ok(())
}
fn close(&mut self) -> Result<(), SdkError> {
{
let mut transport = self.lock_transport()?;
transport.close()?;
}
self.state = DeviceState::Closed;
Ok(())
}
fn state(&self) -> DeviceState {
self.state
}
fn device_info(&self) -> Result<DeviceInfo, SdkError> {
Ok(self.info.clone())
}
fn config(&self) -> &DeviceConfig {
&self.config
}
fn apply_config(&mut self, config: DeviceConfig) -> Result<(), SdkError> {
self.config = config;
Ok(())
}
fn start_stream(&mut self) -> Result<(), SdkError> {
if self.state == DeviceState::Streaming {
return Err(SdkError::AlreadyStreaming);
}
if self.state != DeviceState::Open {
return Err(SdkError::NotInitialized);
}
self.state = DeviceState::Streaming;
self.channels.send_event(DeviceEvent::StreamStarted)?;
Ok(())
}
fn stop_stream(&mut self) -> Result<(), SdkError> {
if self.state != DeviceState::Streaming {
return Err(SdkError::NotStreaming);
}
self.state = DeviceState::Open;
self.channels.send_event(DeviceEvent::StreamStopped)?;
Ok(())
}
fn read_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError> {
self.channels.recv_sample(timeout_ms)
}
fn read_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms)
}
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError> {
self.ensure_open()?;
let request = ReadRequest {
device_addr: self.config.device_addr,
start_addr: addr,
read_byte_count: length,
};
let request_frame = self.codec.encode_read_request(&request)?;
let response_frame = {
let mut transport = self.lock_transport()?;
transport.flush_rx()?;
transport.write(&request_frame)?;
self.read_response_frame_from(transport.as_mut())?
};
let response = self.codec.decode_read_response(&response_frame)?;
Ok(response.data)
}
fn write_register(&mut self, addr: u32, data: &[u8]) -> Result<u16, SdkError> {
self.ensure_open()?;
let request = WriteRequest {
device_addr: self.config.device_addr,
start_addr: addr,
data: data.to_vec(),
};
let request_frame = self.codec.encode_write_request(&request)?;
let response_frame = {
let mut transport = self.lock_transport()?;
transport.flush_rx()?;
transport.write(&request_frame)?;
self.read_response_frame_from(transport.as_mut())?
};
let response = self.codec.decode_write_response(&response_frame)?;
Ok(response.return_byte_count)
}
}

View File

@@ -1,7 +1,7 @@
use crate::{
config::DeviceInfo,
error::SdkError,
types::{DistributionForce, SensorModule},
types::{DistributionForce, ForcePoint, SensorModule},
};
pub const REG_SERIAL_NUMBER: u32 = 0x0000;
@@ -39,6 +39,57 @@ pub struct RegisterSpec {
pub value_type: RegisterValueType,
}
pub const DEVICE_INFO_REGISTERS: &[RegisterSpec] = &[
RegisterSpec {
addr: REG_SERIAL_NUMBER,
len: 4,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U32,
},
RegisterSpec {
addr: REG_FIRMWARE_VERSION,
len: 2,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U16,
},
RegisterSpec {
addr: REG_CALIBRATION_GROUP,
len: 2,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U16,
},
RegisterSpec {
addr: REG_MODULE_ACTIVE_STATUS,
len: 2,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U16,
},
RegisterSpec {
addr: REG_L_LINE,
len: 2,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U16,
},
RegisterSpec {
addr: REG_H_LINE,
len: 2,
access: RegisterAccess::ReadOnly,
value_type: RegisterValueType::U16,
},
RegisterSpec {
addr: REG_PRODUCT_CONFIG_1,
len: 4,
access: RegisterAccess::ReadWrite,
value_type: RegisterValueType::U32,
},
RegisterSpec {
addr: REG_PRODUCT_CONFIG_2,
len: 4,
access: RegisterAccess::ReadWrite,
value_type: RegisterValueType::U32,
},
];
pub trait RegisterMap {
fn device_info_registers(&self) -> &'static [RegisterSpec];
fn distribution_register(&self, module: SensorModule) -> Result<RegisterSpec, SdkError>;
@@ -55,7 +106,7 @@ pub struct EskinRegisterMap;
impl RegisterMap for EskinRegisterMap {
fn device_info_registers(&self) -> &'static [RegisterSpec] {
todo!("device info register specs")
DEVICE_INFO_REGISTERS
}
fn distribution_register(&self, _module: SensorModule) -> Result<RegisterSpec, SdkError> {
@@ -68,9 +119,29 @@ impl RegisterMap for EskinRegisterMap {
fn parse_distribution_force(
&self,
_module: SensorModule,
_raw: &[u8],
module: SensorModule,
raw: &[u8],
) -> Result<DistributionForce, SdkError> {
todo!("parse distribution force")
if raw.len() % 3 != 0 {
return Err(SdkError::FrameError(format!(
"distribution force length must be multiple of 3, got {}",
raw.len()
)));
}
let points = raw
.chunks_exact(3)
.map(|chunk| ForcePoint {
fx: chunk[0] as i8,
fy: chunk[1] as i8,
fz: chunk[2] as i8,
})
.collect::<Vec<_>>();
Ok(DistributionForce {
module,
point_count: points.len() as u16,
points,
})
}
}

View File

@@ -1,9 +1,22 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crate::register::{REG_COMBINED_FORCE, REG_MODULE_ERROR};
use chrono::Duration;
use crate::{
channel::DeviceEvent,
channel::{ChannelManager, DeviceEvent},
error::SdkError,
protocol::{EskinProtocolCodec, ProtocolCodec},
transport::{self, SerialTransport, SharedSerialTransport},
types::{FingerSample, SensorModule},
};
use crate::protocol::{FRAME_CRC_LEN, FRAME_START_RESPONSE, FRAME_STATUS_LEN, ReadRequest};
use std::thread::{self, JoinHandle};
use std::time::Duration as StdDuration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamMode {
Polling,
@@ -16,6 +29,8 @@ pub struct StreamConfig {
pub read_distribution: bool,
pub modules: Vec<SensorModule>,
pub poll_interval_ms: u32,
pub device_addr: u8,
pub read_timeout_ms: u32,
}
impl Default for StreamConfig {
@@ -25,6 +40,8 @@ impl Default for StreamConfig {
read_distribution: true,
modules: Vec::new(),
poll_interval_ms: 10,
device_addr: 0x34,
read_timeout_ms: 100,
}
}
}
@@ -36,3 +53,280 @@ pub trait StreamController: Send {
fn next_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError>;
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError>;
}
pub struct StreamRuntime {
running: Arc<AtomicBool>,
config: Option<StreamConfig>,
channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
worker: Option<JoinHandle<()>>,
}
impl StreamRuntime {
pub fn new(channels: Arc<ChannelManager>, transport: SharedSerialTransport) -> Self {
Self {
running: Arc::new(AtomicBool::new(false)),
config: None,
channels,
transport,
worker: None,
}
}
pub fn running_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.running)
}
pub fn config(&self) -> Option<&StreamConfig> {
self.config.as_ref()
}
pub fn transport(&self) -> SharedSerialTransport {
Arc::clone(&self.transport)
}
}
impl StreamController for StreamRuntime {
fn start(&mut self, config: StreamConfig) -> Result<(), SdkError> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(SdkError::AlreadyStreaming);
}
let collector = make_collector(&config, Arc::clone(&self.transport));
let worker = StreamWorker::new(
Arc::clone(&self.running),
Arc::clone(&self.channels),
Arc::clone(&self.transport),
config.clone(),
collector,
)
.spawn();
self.worker = Some(worker);
self.config = Some(config);
self.channels.send_event(DeviceEvent::StreamStarted)?;
Ok(())
}
fn stop(&mut self) -> Result<(), SdkError> {
if !self.running.swap(false, Ordering::SeqCst) {
return Err(SdkError::NotStreaming);
}
self.channels.send_event(DeviceEvent::StreamStopped)?;
if let Some(worker) = self.worker.take() {
worker
.join()
.map_err(|_| SdkError::InternalError("stream worker panicked".into()))?;
}
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn next_sample(&self, timeout_ms: u32) -> Result<FingerSample, SdkError> {
self.channels.recv_sample(timeout_ms)
}
fn next_event(&self, timeout_ms: u32) -> Result<DeviceEvent, SdkError> {
self.channels.recv_event(timeout_ms)
}
}
pub struct StreamWorker {
running: Arc<AtomicBool>,
channels: Arc<ChannelManager>,
config: StreamConfig,
collector: Box<dyn SampleCollector>,
}
impl StreamWorker {
pub fn new(
running: Arc<AtomicBool>,
channels: Arc<ChannelManager>,
transport: SharedSerialTransport,
config: StreamConfig,
collector: Box<dyn SampleCollector>,
) -> Self {
Self {
running,
channels,
config,
collector,
}
}
pub fn spawn(self) -> JoinHandle<()> {
thread::spawn(move || self.run())
}
fn run(mut self) {
while self.running.load(Ordering::SeqCst) {
if let Err(err) = self.tick() {
let _ = self
.channels
.send_event(DeviceEvent::IoError(err.to_string()));
self.running.store(false, Ordering::SeqCst);
break;
}
thread::sleep(StdDuration::from_millis(
self.config.poll_interval_ms as u64,
));
}
}
fn tick(&mut self) -> Result<(), SdkError> {
// let _transport = self
// .transport
// .lock()
// .map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
let Some(sample) = self.collector.collect_once()? else {
return Ok(());
};
self.channels.send_sample(sample)?;
Ok(())
// TODO:
// 1. encode read request
// 2. transport.write()
// 3. transport.read()
// 4. protocol.decode()
// 5. register parse
// 6. channels.send_sample()
}
}
pub trait SampleCollector: Send {
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError>;
}
pub struct NoopSampleCollector;
impl SampleCollector for NoopSampleCollector {
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError> {
Ok(None)
}
}
pub struct PollingSampleCollector {
transport: SharedSerialTransport,
codec: Box<dyn ProtocolCodec>,
config: StreamConfig,
sequence: u32,
}
impl PollingSampleCollector {
pub fn new(transport: SharedSerialTransport, config: StreamConfig) -> Self {
Self {
transport,
codec: Box::new(EskinProtocolCodec),
config,
sequence: 0,
}
}
fn next_sequence(&mut self) -> u32 {
let sequence = self.sequence;
self.sequence = self.sequence.wrapping_add(1);
sequence
}
fn read_exact_from_transport(
transport: &mut dyn SerialTransport,
buf: &mut [u8],
timeout: Duration,
) -> Result<(), SdkError> {
let mut offset = 0;
while offset < buf.len() {
let n = transport.read(&mut buf[offset..], timeout)?;
if n == 0 {
return Err(SdkError::Timeout);
}
offset += n;
}
Ok(())
}
fn read_response_frame(
&self,
transport: &mut dyn SerialTransport,
) -> Result<Vec<u8>, SdkError> {
let timeout = Duration::milliseconds(self.config.read_timeout_ms as i64);
let mut header = [0u8; 4];
Self::read_exact_from_transport(transport, &mut header, timeout)?;
let start = u16::from_be_bytes([header[0], header[1]]);
if start != FRAME_START_RESPONSE {
return Err(SdkError::FrameError(format!(
"invalid response start: 0x{start:04X}"
)));
}
let data_len = u16::from_le_bytes([header[2], header[3]]) as usize;
let total_len = 4 + data_len + FRAME_STATUS_LEN + FRAME_CRC_LEN;
let mut frame = vec![0u8; total_len];
frame[..4].copy_from_slice(&header);
Self::read_exact_from_transport(transport, &mut frame[4..], timeout)?;
Ok(frame)
}
fn read_register(&mut self, addr: u32, length: u16) -> Result<Vec<u8>, SdkError> {
let request = ReadRequest {
device_addr: self.config.device_addr,
start_addr: addr,
read_byte_count: length,
};
let request_frame = self.codec.encode_read_request(&request)?;
let response_frame = {
let mut transport = self
.transport
.lock()
.map_err(|_| SdkError::InternalError("transport mutex poisoned".into()))?;
transport.flush_rx()?;
transport.write(&request_frame)?;
self.read_response_frame(transport.as_mut())?
};
let response = self.codec.decode_read_response(&response_frame)?;
Ok(response.data)
}
}
impl SampleCollector for PollingSampleCollector {
fn collect_once(&mut self) -> Result<Option<FingerSample>, SdkError> {
let _sequence = self.next_sequence();
let _combined_force_raw = self.read_register(REG_COMBINED_FORCE, 168)?;
let _module_error_raw = self.read_register(REG_MODULE_ERROR, 56)?;
// TODO:
// parse combined force
// parse module error
// build FingerSample
Ok(None)
}
}
fn make_collector(
config: &StreamConfig,
transport: SharedSerialTransport,
) -> Box<dyn SampleCollector> {
match config.mode {
StreamMode::Polling => Box::new(PollingSampleCollector::new(transport, config.clone())),
StreamMode::AutoDistribution => Box::new(NoopSampleCollector),
}
}

View File

@@ -2,6 +2,9 @@ use crate::error::SdkError;
use chrono::Duration;
use serialport::{ClearBuffer, DataBits, FlowControl, Parity, StopBits};
use std::io::ErrorKind;
use std::sync::{Arc, Mutex};
pub type SharedSerialTransport = Arc<Mutex<Box<dyn SerialTransport>>>;
pub trait SerialTransport: Send {
fn open(&mut self) -> Result<(), SdkError>;