#include "components/ffmsep/cpstream_core.hh" #include "components/ffmsep/presist/presist.hh" #include "dlog/dlog.hh" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std::chrono_literals; namespace ffmsep { namespace { constexpr auto kReaderIdleSleep = 5ms; constexpr auto kDecoderIdleSleep = 1ms; const CPCodec* resolve_requested_codec(const CPStreamConfig& config) { if (!config.codec_name.empty()) { if (const CPCodec* codec = cpcodec_find_decoder_by_name(config.codec_name)) { return codec; } } if (config.codec_id != CPCodecID::Unknow) { if (const CPCodec* codec = cpcodec_find_decoder(config.codec_id)) { return codec; } } return nullptr; } } // namespace struct CPStreamCore::Impl { struct Packet { std::vector payload; std::int64_t pts = 0; bool end_of_stream = false; bool flush = false; }; explicit Impl(CPStreamConfig config): config_(std::move(config)) { normalize_config(); frame_writer_ = std::make_unique(); } ~Impl() = default; void normalize_config() { if (config_.read_chunk_size == 0U) { config_.read_chunk_size = 256U; } if (config_.packet_queue_capacity == 0U) { config_.packet_queue_capacity = 1U; } if (config_.frame_queue_capacity == 0U) { config_.frame_queue_capacity = 1U; } if (config_.slave_request_interval.count() < 0) { config_.slave_request_interval = 0ms; } frame_queue_capacity_ = config_.frame_queue_capacity; } bool open(const CPStreamConfig& cfg) { stop(); close(); config_ = cfg; normalize_config(); if (config_.port.empty()) { set_last_error("serial port is empty"); return false; } codec_descriptor_ = resolve_requested_codec(config_); if (!codec_descriptor_) { set_last_error("codec not found for requested identifier"); return false; } codec_ctx_ = cpcodec_alloc_context(codec_descriptor_); if (!codec_ctx_) { set_last_error("failed to allocate codec context"); return false; } int rc = cpcodec_open(codec_ctx_, codec_descriptor_); if (rc < CP_SUCCESS) { set_last_error("failed to open codec context: error " + std::to_string(rc)); cpcodec_free_context(&codec_ctx_); codec_ctx_ = nullptr; return false; } try { auto serial = std::make_shared( config_.port, config_.baudrate, config_.timeout, config_.bytesize, config_.parity, config_.stopbits, config_.flowcontrol); if (!serial->isOpen()) { serial->open(); } serial->flush(); { std::lock_guard lock(serial_mutex_); serial_ = std::move(serial); } } catch (const serial::IOException& ex) { set_last_error(ex.what() ? ex.what() : "serial IO exception"); cpcodec_close(codec_ctx_); cpcodec_free_context(&codec_ctx_); codec_ctx_ = nullptr; return false; } catch (const serial::SerialException& ex) { set_last_error(ex.what() ? ex.what() : "serial exception"); cpcodec_close(codec_ctx_); cpcodec_free_context(&codec_ctx_); codec_ctx_ = nullptr; return false; } catch (const std::exception& ex) { set_last_error(ex.what()); cpcodec_close(codec_ctx_); cpcodec_free_context(&codec_ctx_); codec_ctx_ = nullptr; return false; } { std::lock_guard lock(packet_mutex_); packet_queue_.clear(); } { std::lock_guard lock(frame_mutex_); frame_queue_.clear(); } pts_counter_.store(0, std::memory_order_relaxed); stop_requested_.store(false, std::memory_order_release); set_last_error({}); return true; } bool open() { return open(config_); } bool reopen(const CPStreamConfig& cfg) { return open(cfg); } void close() { stop(); { std::lock_guard lock(serial_mutex_); if (serial_) { try { if (serial_->isOpen()) { serial_->close(); } } catch (...) { // Ignore close errors. } serial_.reset(); } } if (codec_ctx_) { cpcodec_close(codec_ctx_); cpcodec_free_context(&codec_ctx_); codec_ctx_ = nullptr; } { std::lock_guard lock(packet_mutex_); packet_queue_.clear(); } { std::lock_guard lock(frame_mutex_); frame_queue_.clear(); frame_record_queue_.clear(); } } bool start() { if (running_.load(std::memory_order_acquire)) { return true; } std::shared_ptr serial_copy; { std::lock_guard lock(serial_mutex_); serial_copy = serial_; } if (!serial_copy || !serial_copy->isOpen()) { set_last_error("serial port is not open"); return false; } if (!codec_ctx_ || !codec_ctx_->is_open) { set_last_error("codec context is not ready"); return false; } stop_requested_.store(false, std::memory_order_release); running_.store(true, std::memory_order_release); reader_thread_ = std::thread(&Impl::reader_loop, this); decoder_thread_ = std::thread(&Impl::decoder_loop, this); if (!config_.slave_request_command.empty()) { slave_thread_ = std::thread(&Impl::slave_loop, this); } return true; } void stop() { if (!running_.exchange(false, std::memory_order_acq_rel)) { return; } stop_requested_.store(true, std::memory_order_release); packet_cv_.notify_all(); if (reader_thread_.joinable()) { reader_thread_.join(); } if (slave_thread_.joinable()) { slave_thread_.join(); } signal_decoder_flush(true); packet_cv_.notify_all(); if (decoder_thread_.joinable()) { decoder_thread_.join(); } stop_requested_.store(false, std::memory_order_release); { std::lock_guard lock(packet_mutex_); packet_queue_.clear(); } if (codec_ctx_ && codec_ctx_->is_open) { reset_decoder(); } } bool is_open() const { std::lock_guard lock(serial_mutex_); return serial_ && serial_->isOpen(); } bool is_running() const { return running_.load(std::memory_order_acquire); } bool send(const std::vector& data) { return send(data.data(), data.size()); } bool send(const std::uint8_t* data, std::size_t size) { if (!data || size == 0U) { return false; } std::shared_ptr serial_copy; { std::lock_guard lock(serial_mutex_); serial_copy = serial_; } if (!serial_copy || !serial_copy->isOpen()) { set_last_error("serial port is not open"); return false; } try { const auto written = serial_copy->write(data, size); return written == size; } catch (const serial::IOException& ex) { set_last_error(ex.what() ? ex.what() : "serial IO exception"); } catch (const serial::SerialException& ex) { set_last_error(ex.what() ? ex.what() : "serial exception"); } catch (const std::exception& ex) { set_last_error(ex.what()); } return false; } std::optional> try_pop_frame() { std::lock_guard lock(frame_mutex_); if (frame_queue_.empty()) { return std::nullopt; } std::shared_ptr frame = std::move(frame_queue_.front()); frame_queue_.pop_front(); return frame; } bool wait_for_frame(std::shared_ptr& frame, std::chrono::milliseconds timeout) { std::unique_lock lock(frame_mutex_); if (!frame_cv_.wait_for(lock, timeout, [&] { return !frame_queue_.empty(); })) { return false; } frame = std::move(frame_queue_.front()); frame_queue_.pop_front(); return true; } void clear_frames() { std::lock_guard lock(frame_mutex_); frame_queue_.clear(); } void set_frame_queue_capacity(std::size_t capacity) { if (capacity == 0U) { capacity = 1U; } { std::lock_guard lock(frame_mutex_); frame_queue_capacity_ = capacity; config_.frame_queue_capacity = capacity; while (frame_queue_.size() > frame_queue_capacity_) { frame_queue_.pop_front(); } } } void clear_recorded_frames() { std::lock_guard lock(frame_mutex_); frame_record_queue_.clear(); } std::size_t recorded_frame_count() const { std::lock_guard lock(frame_mutex_); return frame_record_queue_.size(); } std::future export_recorded_frames(const std::string& path, bool clear_after_export) { if (!frame_writer_) { frame_writer_ = std::make_unique(); } std::deque> snapshot; { std::lock_guard lock(frame_mutex_); snapshot = frame_record_queue_; if (clear_after_export) { frame_record_queue_.clear(); } } if (snapshot.empty()) { std::promise promise; auto future = promise.get_future(); promise.set_value(persist::WriteResult{ false, "no recorded frames available", path }); return future; } return frame_writer_->enqueue(path, std::move(snapshot)); } void set_frame_callback(FrameCallback callback) { std::lock_guard lock(callback_mutex_); frame_callback_ = std::move(callback); } CPStreamConfig config() const { return config_; } std::string last_error() const { std::lock_guard lock(last_error_mutex_); return last_error_; } static std::vector list_ports() { return serial::list_ports(); } void reader_loop() { std::vector buffer(config_.read_chunk_size); while (!stop_requested_.load(std::memory_order_acquire)) { std::shared_ptr serial_copy; { std::lock_guard lock(serial_mutex_); serial_copy = serial_; } if (!serial_copy || !serial_copy->isOpen()) { std::this_thread::sleep_for(kReaderIdleSleep); continue; } std::size_t bytes_read = 0; try { bytes_read = serial_copy->read(buffer.data(), buffer.size()); qDebug() << "bytes_read: " << bytes_read; } catch (const serial::IOException& ex) { set_last_error(ex.what() ? ex.what() : "serial IO exception"); std::this_thread::sleep_for(kReaderIdleSleep); continue; } catch (const serial::SerialException& ex) { set_last_error(ex.what() ? ex.what() : "serial exception"); std::this_thread::sleep_for(kReaderIdleSleep); continue; } catch (const std::exception& ex) { set_last_error(ex.what()); std::this_thread::sleep_for(kReaderIdleSleep); continue; } if (bytes_read == 0U) { std::this_thread::sleep_for(kReaderIdleSleep); continue; } const auto format_command = [](const std::vector& data) -> std::string { if (data.empty()) { return "[]"; } std::ostringstream oss; oss << '[' << std::uppercase << std::setfill('0'); for (std::size_t idx = 0; idx < data.size(); ++idx) { if (idx != 0U) { oss << ' '; } oss << std::setw(2) << std::hex << static_cast(data[idx]); } oss << ']'; return oss.str(); }; Packet packet; packet.payload.assign(buffer.begin(), buffer.begin() + static_cast(bytes_read)); // std::cout << "======payload======" << std::endl; // std::cout << format_command(packet.payload) << std::endl; packet.pts = pts_counter_.fetch_add(1, std::memory_order_relaxed); { std::lock_guard lock(packet_mutex_); if (packet_queue_.size() >= config_.packet_queue_capacity) { packet_queue_.pop_front(); } packet_queue_.push_back(std::move(packet)); } packet_cv_.notify_one(); } } void slave_loop() { const auto command = config_.slave_request_command; auto interval = config_.slave_request_interval; if (interval.count() < 0) { interval = 0ms; } const bool repeat = interval.count() > 0; while (!stop_requested_.load(std::memory_order_acquire)) { const bool success = send(command); if (!success) { std::this_thread::sleep_for(kReaderIdleSleep); continue; } if (!repeat) { break; } auto remaining = interval; while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_acquire)) { const auto step = std::min(remaining, kReaderIdleSleep); std::this_thread::sleep_for(step); remaining -= step; } } } void decoder_loop() { while (true) { Packet packet; { std::unique_lock lock(packet_mutex_); packet_cv_.wait(lock, [&] { return stop_requested_.load(std::memory_order_acquire) || !packet_queue_.empty(); }); if (packet_queue_.empty()) { if (stop_requested_.load(std::memory_order_acquire)) { break; } continue; } packet = std::move(packet_queue_.front()); packet_queue_.pop_front(); } if (!codec_ctx_ || !codec_ctx_->is_open) { if (packet.end_of_stream) { break; } std::this_thread::sleep_for(kDecoderIdleSleep); continue; } CPPacket cp_packet; cp_packet.payload = std::move(packet.payload); cp_packet.pts = packet.pts; cp_packet.dts = packet.pts; cp_packet.end_of_stream = packet.end_of_stream; cp_packet.flush = packet.flush; int rc = cpcodec_send_packet(codec_ctx_, &cp_packet); if (rc < CP_SUCCESS) { if (packet.end_of_stream) { break; } continue; } while (true) { CPFrame frame; rc = cpcodec_receive_frame(codec_ctx_, &frame); if (rc == CP_SUCCESS) { auto decoded = std::make_shared(); decoded->pts = frame.pts; decoded->received_at = std::chrono::steady_clock::now(); decoded->frame = std::move(frame); decoded->id = codec_descriptor_ ? codec_descriptor_->id : CPCodecID::Unknow; if (decoded->id == CPCodecID::Tactile) { if (auto parsed = tactile::parse_frame(decoded->frame)) { decoded->tactile = parsed; decoded->tactile_pressures = tactile::parse_pressure_values(*parsed); if (auto matrix = tactile::parse_matrix_size_payload(*parsed)) { decoded->tactile_matrix_size = matrix; } } } else if (decoded->id == CPCodecID::PiezoresistiveB) { decoded->tactile_pressures = tactile::parse_piezoresistive_b_pressures(decoded->frame); } FrameCallback callback_copy; { std::lock_guard lock(callback_mutex_); callback_copy = frame_callback_; } if (callback_copy) { callback_copy(decoded); } { std::lock_guard lock(frame_mutex_); if (frame_queue_.size() >= frame_queue_capacity_) { frame_queue_.pop_front(); } frame_queue_.push_back(decoded); if (decoded->id == CPCodecID::Tactile || decoded->id == CPCodecID::PiezoresistiveB) { frame_record_queue_.push_back(decoded); } } frame_cv_.notify_one(); } else if (rc == CP_ERROR_EAGAIN) { break; } else { if (rc == CP_ERROR_EOF && packet.end_of_stream) { return; } break; } } if (packet.end_of_stream) { break; } } } void signal_decoder_flush(bool end_of_stream) { Packet packet; packet.flush = true; packet.end_of_stream = end_of_stream; { std::lock_guard lock(packet_mutex_); packet_queue_.push_back(std::move(packet)); } packet_cv_.notify_one(); } void reset_decoder() { if (!codec_ctx_ || !codec_descriptor_) { return; } cpcodec_close(codec_ctx_); int rc = cpcodec_open(codec_ctx_, codec_descriptor_); if (rc < CP_SUCCESS) { set_last_error("failed to reset codec context: error " + std::to_string(rc)); } } void set_last_error(std::string message) { std::lock_guard lock(last_error_mutex_); last_error_ = std::move(message); } CPStreamConfig config_{}; const CPCodec* codec_descriptor_ = nullptr; std::shared_ptr serial_; mutable std::mutex serial_mutex_; CPCodecContext* codec_ctx_ = nullptr; std::thread reader_thread_; std::thread slave_thread_; std::thread decoder_thread_; std::mutex packet_mutex_; std::condition_variable packet_cv_; std::deque packet_queue_; mutable std::mutex frame_mutex_; std::condition_variable frame_cv_; // std::deque frame_queue_; // 更新为智能指针,我们需要更长的生命周期😊 std::deque> frame_queue_; std::deque> frame_record_queue_; std::size_t frame_queue_capacity_ = 16; FrameCallback frame_callback_; mutable std::mutex callback_mutex_; std::atomic running_{ false }; std::atomic stop_requested_{ false }; std::atomic pts_counter_{ 0 }; std::string last_error_; mutable std::mutex last_error_mutex_; std::unique_ptr frame_writer_; }; CPStreamCore::CPStreamCore(CPStreamConfig config): impl_(std::make_unique(std::move(config))) {} CPStreamCore::~CPStreamCore() { if (impl_) { impl_->stop(); impl_->close(); } } bool CPStreamCore::open(const CPStreamConfig& config) { return impl_->open(config); } bool CPStreamCore::open() { return impl_->open(); } bool CPStreamCore::reopen(const CPStreamConfig& config) { return impl_->reopen(config); } void CPStreamCore::close() { impl_->close(); } bool CPStreamCore::start() { return impl_->start(); } void CPStreamCore::stop() { impl_->stop(); } bool CPStreamCore::is_open() const noexcept { return impl_->is_open(); } bool CPStreamCore::is_running() const noexcept { return impl_->is_running(); } bool CPStreamCore::send(const std::vector& data) { return impl_->send(data); } bool CPStreamCore::send(const std::uint8_t* data, std::size_t size) { return impl_->send(data, size); } std::optional> CPStreamCore::try_pop_frame() { return impl_->try_pop_frame(); } bool CPStreamCore::wait_for_frame(std::shared_ptr& frame, std::chrono::milliseconds timeout) { return impl_->wait_for_frame(frame, timeout); } void CPStreamCore::clear_frames() { impl_->clear_frames(); } void CPStreamCore::set_frame_queue_capacity(std::size_t capacity) { impl_->set_frame_queue_capacity(capacity); } void CPStreamCore::clear_recorded_frames() { impl_->clear_recorded_frames(); } std::size_t CPStreamCore::recorded_frame_count() const { return impl_->recorded_frame_count(); } std::future CPStreamCore::export_recorded_frames(const std::string& path, bool clear_after_export) { return impl_->export_recorded_frames(path, clear_after_export); } void CPStreamCore::set_frame_callback(FrameCallback callback) { impl_->set_frame_callback(std::move(callback)); } CPStreamConfig CPStreamCore::config() const { return impl_->config(); } std::string CPStreamCore::last_error() const { return impl_->last_error(); } std::vector CPStreamCore::list_available_ports() { return Impl::list_ports(); } } // namespace ffmsep