feat:heapmap with value;fix:qcustomplot warning

This commit is contained in:
2025-11-04 10:47:41 +08:00
parent f411ab21cb
commit a07ff7d6b7
9 changed files with 852 additions and 389 deletions

View File

@@ -1,5 +1,8 @@
#include "components/ffmsep/cpstream_core.hh"
#include "components/ffmsep/presist/presist.hh"
#include "dlog/dlog.hh"
#include <algorithm>
#include <atomic>
#include <chrono>
@@ -7,19 +10,21 @@
#include <cstddef>
#include <cstdint>
#include <deque>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include <vector>
using namespace std::chrono_literals;
namespace ffmsep {
namespace {
constexpr auto kReaderIdleSleep = std::chrono::milliseconds(5);
constexpr auto kDecoderIdleSleep = std::chrono::milliseconds(1);
constexpr auto kReaderIdleSleep = 5ms;
constexpr auto kDecoderIdleSleep = 1ms;
const CPCodec* resolve_requested_codec(const CPStreamConfig& config) {
if (!config.codec_name.empty()) {
@@ -48,6 +53,7 @@ struct CPStreamCore::Impl {
explicit Impl(CPStreamConfig config)
: config_(std::move(config)) {
normalize_config();
frame_writer_ = std::make_unique<persist::JsonWritter>();
}
~Impl() = default;
@@ -63,7 +69,7 @@ struct CPStreamCore::Impl {
config_.frame_queue_capacity = 1U;
}
if (config_.slave_request_interval.count() < 0) {
config_.slave_request_interval = std::chrono::milliseconds{0};
config_.slave_request_interval = 0ms;
}
frame_queue_capacity_ = config_.frame_queue_capacity;
}
@@ -115,7 +121,7 @@ struct CPStreamCore::Impl {
serial->flush();
{
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
serial_ = std::move(serial);
}
} catch (const serial::IOException& ex) {
@@ -139,11 +145,11 @@ struct CPStreamCore::Impl {
}
{
std::lock_guard lock(packet_mutex_);
std::lock_guard<std::mutex> lock(packet_mutex_);
packet_queue_.clear();
}
{
std::lock_guard lock(frame_mutex_);
std::lock_guard<std::mutex> lock(frame_mutex_);
frame_queue_.clear();
}
pts_counter_.store(0, std::memory_order_relaxed);
@@ -164,7 +170,7 @@ struct CPStreamCore::Impl {
stop();
{
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
if (serial_) {
try {
if (serial_->isOpen()) {
@@ -184,12 +190,13 @@ struct CPStreamCore::Impl {
}
{
std::lock_guard lock(packet_mutex_);
std::lock_guard<std::mutex> lock(packet_mutex_);
packet_queue_.clear();
}
{
std::lock_guard lock(frame_mutex_);
std::lock_guard<std::mutex> lock(frame_mutex_);
frame_queue_.clear();
frame_record_queue_.clear();
}
}
@@ -200,7 +207,7 @@ struct CPStreamCore::Impl {
std::shared_ptr<serial::Serial> serial_copy;
{
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
serial_copy = serial_;
}
if (!serial_copy || !serial_copy->isOpen()) {
@@ -249,7 +256,7 @@ struct CPStreamCore::Impl {
stop_requested_.store(false, std::memory_order_release);
{
std::lock_guard lock(packet_mutex_);
std::lock_guard<std::mutex> lock(packet_mutex_);
packet_queue_.clear();
}
@@ -259,7 +266,7 @@ struct CPStreamCore::Impl {
}
bool is_open() const {
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
return serial_ && serial_->isOpen();
}
@@ -278,7 +285,7 @@ struct CPStreamCore::Impl {
std::shared_ptr<serial::Serial> serial_copy;
{
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
serial_copy = serial_;
}
@@ -300,17 +307,17 @@ struct CPStreamCore::Impl {
return false;
}
std::optional<DecodedFrame> try_pop_frame() {
std::lock_guard lock(frame_mutex_);
std::optional<std::shared_ptr<DecodedFrame>> try_pop_frame() {
std::lock_guard<std::mutex> lock(frame_mutex_);
if (frame_queue_.empty()) {
return std::nullopt;
}
DecodedFrame frame = std::move(frame_queue_.front());
std::shared_ptr<DecodedFrame> frame = std::move(frame_queue_.front());
frame_queue_.pop_front();
return frame;
}
bool wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
bool wait_for_frame(std::shared_ptr<DecodedFrame>& frame, std::chrono::milliseconds timeout) {
std::unique_lock lock(frame_mutex_);
if (!frame_cv_.wait_for(lock, timeout, [&] {
return !frame_queue_.empty();
@@ -323,7 +330,7 @@ struct CPStreamCore::Impl {
}
void clear_frames() {
std::lock_guard lock(frame_mutex_);
std::lock_guard<std::mutex> lock(frame_mutex_);
frame_queue_.clear();
}
@@ -332,7 +339,7 @@ struct CPStreamCore::Impl {
capacity = 1U;
}
{
std::lock_guard lock(frame_mutex_);
std::lock_guard<std::mutex> lock(frame_mutex_);
frame_queue_capacity_ = capacity;
config_.frame_queue_capacity = capacity;
while (frame_queue_.size() > frame_queue_capacity_) {
@@ -341,8 +348,46 @@ struct CPStreamCore::Impl {
}
}
void clear_recorded_frames() {
std::lock_guard<std::mutex> lock(frame_mutex_);
frame_record_queue_.clear();
}
std::size_t recorded_frame_count() const {
std::lock_guard<std::mutex> lock(frame_mutex_);
return frame_record_queue_.size();
}
std::future<persist::WriteResult> export_recorded_frames(const std::string& path, bool clear_after_export) {
if (!frame_writer_) {
frame_writer_ = std::make_unique<persist::JsonWritter>();
}
std::deque<std::shared_ptr<DecodedFrame>> snapshot;
{
std::lock_guard<std::mutex> lock(frame_mutex_);
snapshot = frame_record_queue_;
if (clear_after_export) {
frame_record_queue_.clear();
}
}
if (snapshot.empty()) {
std::promise<persist::WriteResult> promise;
auto future = promise.get_future();
promise.set_value(persist::WriteResult{
false,
"no recorded frames available",
path
});
return future;
}
return frame_writer_->enqueue(path, std::move(snapshot));
}
void set_frame_callback(FrameCallback callback) {
std::lock_guard lock(callback_mutex_);
std::lock_guard<std::mutex> lock(callback_mutex_);
frame_callback_ = std::move(callback);
}
@@ -351,7 +396,7 @@ struct CPStreamCore::Impl {
}
std::string last_error() const {
std::lock_guard lock(last_error_mutex_);
std::lock_guard<std::mutex> lock(last_error_mutex_);
return last_error_;
}
@@ -365,7 +410,7 @@ struct CPStreamCore::Impl {
while (!stop_requested_.load(std::memory_order_acquire)) {
std::shared_ptr<serial::Serial> serial_copy;
{
std::lock_guard lock(serial_mutex_);
std::lock_guard<std::mutex> lock(serial_mutex_);
serial_copy = serial_;
}
if (!serial_copy || !serial_copy->isOpen()) {
@@ -400,7 +445,7 @@ struct CPStreamCore::Impl {
packet.pts = pts_counter_.fetch_add(1, std::memory_order_relaxed);
{
std::lock_guard lock(packet_mutex_);
std::lock_guard<std::mutex> lock(packet_mutex_);
if (packet_queue_.size() >= config_.packet_queue_capacity) {
packet_queue_.pop_front();
}
@@ -414,7 +459,7 @@ struct CPStreamCore::Impl {
const auto command = config_.slave_request_command;
auto interval = config_.slave_request_interval;
if (interval.count() < 0) {
interval = std::chrono::milliseconds{0};
interval = 0ms;
}
const bool repeat = interval.count() > 0;
@@ -484,23 +529,24 @@ struct CPStreamCore::Impl {
CPFrame frame;
rc = cpcodec_receive_frame(codec_ctx_, &frame);
if (rc == CP_SUCCESS) {
DecodedFrame decoded;
decoded.pts = frame.pts;
decoded.received_at = std::chrono::steady_clock::now();
decoded.frame = std::move(frame);
if (codec_descriptor_ && codec_descriptor_->id == CPCodecID::Tactile) {
if (auto parsed = tactile::parse_frame(decoded.frame)) {
decoded.tactile = parsed;
decoded.tactile_pressures = tactile::parse_pressure_values(*parsed);
auto decoded = std::make_shared<DecodedFrame>();
decoded->pts = frame.pts;
decoded->received_at = std::chrono::steady_clock::now();
decoded->frame = std::move(frame);
decoded->id = codec_descriptor_ ? codec_descriptor_->id : CPCodecID::Unknow;
if (decoded->id == CPCodecID::Tactile) {
if (auto parsed = tactile::parse_frame(decoded->frame)) {
decoded->tactile = parsed;
decoded->tactile_pressures = tactile::parse_pressure_values(*parsed);
if (auto matrix = tactile::parse_matrix_size_payload(*parsed)) {
decoded.tactile_matrix_size = matrix;
decoded->tactile_matrix_size = matrix;
}
}
}
FrameCallback callback_copy;
{
std::lock_guard lock(callback_mutex_);
std::lock_guard<std::mutex> lock(callback_mutex_);
callback_copy = frame_callback_;
}
if (callback_copy) {
@@ -508,11 +554,14 @@ struct CPStreamCore::Impl {
}
{
std::lock_guard lock(frame_mutex_);
std::lock_guard<std::mutex> lock(frame_mutex_);
if (frame_queue_.size() >= frame_queue_capacity_) {
frame_queue_.pop_front();
}
frame_queue_.push_back(std::move(decoded));
frame_queue_.push_back(decoded);
if (decoded->id == CPCodecID::Tactile) {
frame_record_queue_.push_back(decoded);
}
}
frame_cv_.notify_one();
} else if (rc == CP_ERROR_EAGAIN) {
@@ -536,7 +585,7 @@ struct CPStreamCore::Impl {
packet.flush = true;
packet.end_of_stream = end_of_stream;
{
std::lock_guard lock(packet_mutex_);
std::lock_guard<std::mutex> lock(packet_mutex_);
packet_queue_.push_back(std::move(packet));
}
packet_cv_.notify_one();
@@ -554,7 +603,7 @@ struct CPStreamCore::Impl {
}
void set_last_error(std::string message) {
std::lock_guard lock(last_error_mutex_);
std::lock_guard<std::mutex> lock(last_error_mutex_);
last_error_ = std::move(message);
}
@@ -574,9 +623,12 @@ struct CPStreamCore::Impl {
std::condition_variable packet_cv_;
std::deque<Packet> packet_queue_;
std::mutex frame_mutex_;
mutable std::mutex frame_mutex_;
std::condition_variable frame_cv_;
std::deque<DecodedFrame> frame_queue_;
// std::deque<DecodedFrame> frame_queue_;
// 更新为智能指针,我们需要更长的生命周期😊
std::deque<std::shared_ptr<DecodedFrame>> frame_queue_;
std::deque<std::shared_ptr<DecodedFrame>> frame_record_queue_;
std::size_t frame_queue_capacity_ = 16;
FrameCallback frame_callback_;
@@ -588,6 +640,8 @@ struct CPStreamCore::Impl {
std::string last_error_;
mutable std::mutex last_error_mutex_;
std::unique_ptr<persist::JsonWritter> frame_writer_;
};
CPStreamCore::CPStreamCore(CPStreamConfig config)
@@ -640,11 +694,11 @@ bool CPStreamCore::send(const std::uint8_t* data, std::size_t size) {
return impl_->send(data, size);
}
std::optional<DecodedFrame> CPStreamCore::try_pop_frame() {
std::optional<std::shared_ptr<DecodedFrame>> CPStreamCore::try_pop_frame() {
return impl_->try_pop_frame();
}
bool CPStreamCore::wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
bool CPStreamCore::wait_for_frame(std::shared_ptr<DecodedFrame>& frame, std::chrono::milliseconds timeout) {
return impl_->wait_for_frame(frame, timeout);
}
@@ -656,6 +710,18 @@ void CPStreamCore::set_frame_queue_capacity(std::size_t capacity) {
impl_->set_frame_queue_capacity(capacity);
}
void CPStreamCore::clear_recorded_frames() {
impl_->clear_recorded_frames();
}
std::size_t CPStreamCore::recorded_frame_count() const {
return impl_->recorded_frame_count();
}
std::future<persist::WriteResult> CPStreamCore::export_recorded_frames(const std::string& path, bool clear_after_export) {
return impl_->export_recorded_frames(path, clear_after_export);
}
void CPStreamCore::set_frame_callback(FrameCallback callback) {
impl_->set_frame_callback(std::move(callback));
}