use crate::serial_core::codec::Codec; use crate::serial_core::codecs::tactile_a::TactileACodec; use crate::serial_core::frame::{FrameHandler, TactileAFrame, TestFrame}; use crate::serial_core::model::{HudChartState, HudPacket}; use crate::serial_core::record::Recording; use anyhow::Result; use tauri::{AppHandle, Emitter}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time::{self, Duration, MissedTickBehavior}; use tokio_serial::SerialStream; use tokio_util::sync::CancellationToken; use std::future::pending; use std::sync::{Arc, Mutex}; use std::time::Instant; use log::{info, debug}; use crate::serial_core::record::{FrameTiming, RecordedFrame}; pub enum PollMode { Disable, Enabled(Box>) } pub trait SerialFrame: Clone + Send + 'static { fn dts_ms(&self) -> u64; fn to_hud_packet( &self, chart_state: &mut HudChartState, display_values: Option<&[i32]>, ) -> Option; } impl SerialFrame for TestFrame { fn dts_ms(&self) -> u64 { self.dts_ms } fn to_hud_packet( &self, chart_state: &mut HudChartState, display_values: Option<&[i32]>, ) -> Option { Some(chart_state.apply_frame(self, display_values)) } } impl SerialFrame for TactileAFrame { fn dts_ms(&self) -> u64 { match self { TactileAFrame::Req(_) => 0, TactileAFrame::Rep(rep) => rep.dts_ms, } } fn to_hud_packet( &self, chart_state: &mut HudChartState, display_values: Option<&[i32]>, ) -> Option { match self { TactileAFrame::Req(_) => None, TactileAFrame::Rep(rep) => { let proxy = TestFrame { header: rep.meta.header, cmd: rep.meta.func_code, length: rep.meta.except_data_len, payload: rep.payload.clone(), checksum: rep.meta.checksum, dts_ms: rep.dts_ms, }; Some(chart_state.apply_frame(&proxy, display_values)) } } } } pub trait PollRequester: Send { fn poll_interval(&self) -> Option { None } fn should_request(&mut self) -> bool { true } fn next_request(&mut self) -> Result> { Ok(None) } fn on_rx_frame(&mut self, _frame: &F) {} } #[derive(Default)] pub struct NoopPollRequester; impl PollRequester for NoopPollRequester {} pub struct TactileAPollRequester { period: Duration, cols: usize, rows: usize, awaiting_reply: bool, last_request_at: Option, reply_timeout: Duration, } impl TactileAPollRequester { pub fn new(period: Duration, cols: usize, rows: usize, reply_timeout: Duration) -> Self { Self { period, cols, rows, awaiting_reply: false, last_request_at: None, reply_timeout, } } } impl PollRequester for TactileAPollRequester { fn poll_interval(&self) -> Option { Some(self.period) } fn should_request(&mut self) -> bool { if !self.awaiting_reply { return true; } let timed_out = self .last_request_at .map(|t| t.elapsed() >= self.reply_timeout) .unwrap_or(false); if timed_out { self.awaiting_reply = false; self.last_request_at = None; return true; } false } fn next_request(&mut self) -> Result> { let req = TactileACodec::build_req_frame(self.cols, self.rows)?; self.awaiting_reply = true; self.last_request_at = Some(Instant::now()); Ok(Some(req)) } fn on_rx_frame(&mut self, frame: &TactileAFrame) { if matches!(frame, TactileAFrame::Rep(_)) { self.awaiting_reply = false; self.last_request_at = None } } } pub async fn run_serial( app: AppHandle, mut port: SerialStream, mut codec: C, mut handler: H, session_started_at: Instant, recording: Arc>>, cancel: CancellationToken, ) -> Result<()> where F: SerialFrame, C: Codec + Send + 'static, H: FrameHandler + Send + 'static, T: Into { run_serial_with_poll( app, port, codec, handler, session_started_at, recording, cancel, PollMode::Disable ).await } pub async fn run_serial_with_poll( app: AppHandle, mut port: SerialStream, mut codec: C, mut handler: H, session_started_at: Instant, recording: Arc>>, cancel: CancellationToken, poll_mode: PollMode ) -> Result<()> where F: SerialFrame, C: Codec + Send + 'static, H: FrameHandler + Send + 'static, T: Into, { let mut requester = match poll_mode { PollMode::Disable => None, PollMode::Enabled(r) => Some(r), }; let mut poll_interval = requester .as_ref() .and_then(|r| r.poll_interval()) .map(|d| { let mut it = time::interval(d); it.set_missed_tick_behavior(MissedTickBehavior::Skip); it }); let mut chart_state = HudChartState::new(); let mut buffer = [0u8; 1024]; let mut prune_interval = time::interval(Duration::from_millis(450)); prune_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { tokio::select! { _ = cancel.cancelled() => break, _ = async { match poll_interval.as_mut() { Some(it) => { it.tick().await; } None => pending::<()>().await, } } => { if let Some(r) = requester.as_mut() { if r.should_request() { if let Some(req) = r.next_request()? { let bytes = codec.encode(&req)?; port.write_all(&bytes).await?; } } } } _ = prune_interval.tick() => { if let Some(packet) = chart_state.prune_stale() { app.emit("hud_stream", packet)?; } } read_result = port.read(&mut buffer) => { let n = read_result?; if n == 0 { // Some serial drivers can resolve reads with 0 bytes repeatedly. // Yield here so timer-driven poll requests are not starved by a busy loop. tokio::task::yield_now().await; continue; } let frames = codec.decode(&buffer[..n], session_started_at)?; for frame in frames { if let Some(r) = requester.as_mut() { r.on_rx_frame(&frame); } let decode_res = handler .on_frame(&frame) .await? .map(|vals| vals.into_iter().map(Into::into).collect::>()); let mut record = recording.lock().map_err(|_| anyhow::anyhow!("recording state poisoned"))?; record.push(RecordedFrame{ timing: FrameTiming { pts_ms: None, dts_ms: frame.dts_ms() }, frame: frame.clone(), }); let display_values = if let Some(vals) = decode_res.as_ref() { let summary = vals.iter().copied().sum::(); chart_state.record_summary(summary as f32); chart_state.record_pressure_matrix(vals.as_slice()); Some(vec![summary]) } else { None }; if let Some(packet) = frame.to_hud_packet(&mut chart_state, display_values.as_deref()) { app.emit("hud_stream", packet)?; } } } } } Ok(()) }