Merge branch 'dev'
# Conflicts: # components/ffmsep/cpstream_core.hh # components/view.cc
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
#include "components/ffmsep/cpstream_core.hh"
|
||||
|
||||
#include "components/ffmsep/presist/presist.hh"
|
||||
#include "dlog/dlog.hh"
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
@@ -7,20 +10,22 @@
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
|
||||
namespace ffmsep {
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr auto kReaderIdleSleep = std::chrono::milliseconds(5);
|
||||
constexpr auto kDecoderIdleSleep = std::chrono::milliseconds(1);
|
||||
constexpr auto kReaderIdleSleep = 5ms;
|
||||
constexpr auto kDecoderIdleSleep = 1ms;
|
||||
|
||||
const CPCodec* resolve_requested_codec(const CPStreamConfig& config) {
|
||||
if (!config.codec_name.empty()) {
|
||||
@@ -49,6 +54,7 @@ struct CPStreamCore::Impl {
|
||||
explicit Impl(CPStreamConfig config)
|
||||
: config_(std::move(config)) {
|
||||
normalize_config();
|
||||
frame_writer_ = std::make_unique<persist::JsonWritter>();
|
||||
}
|
||||
|
||||
~Impl() = default;
|
||||
@@ -64,7 +70,7 @@ struct CPStreamCore::Impl {
|
||||
config_.frame_queue_capacity = 1U;
|
||||
}
|
||||
if (config_.slave_request_interval.count() < 0) {
|
||||
config_.slave_request_interval = std::chrono::milliseconds{0};
|
||||
config_.slave_request_interval = 0ms;
|
||||
}
|
||||
frame_queue_capacity_ = config_.frame_queue_capacity;
|
||||
}
|
||||
@@ -116,7 +122,7 @@ struct CPStreamCore::Impl {
|
||||
serial->flush();
|
||||
|
||||
{
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
serial_ = std::move(serial);
|
||||
}
|
||||
} catch (const serial::IOException& ex) {
|
||||
@@ -140,11 +146,11 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(packet_mutex_);
|
||||
std::lock_guard<std::mutex> lock(packet_mutex_);
|
||||
packet_queue_.clear();
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
frame_queue_.clear();
|
||||
}
|
||||
pts_counter_.store(0, std::memory_order_relaxed);
|
||||
@@ -165,7 +171,7 @@ struct CPStreamCore::Impl {
|
||||
stop();
|
||||
|
||||
{
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
if (serial_) {
|
||||
try {
|
||||
if (serial_->isOpen()) {
|
||||
@@ -185,12 +191,13 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(packet_mutex_);
|
||||
std::lock_guard<std::mutex> lock(packet_mutex_);
|
||||
packet_queue_.clear();
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
frame_queue_.clear();
|
||||
frame_record_queue_.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,7 +208,7 @@ struct CPStreamCore::Impl {
|
||||
|
||||
std::shared_ptr<serial::Serial> serial_copy;
|
||||
{
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
serial_copy = serial_;
|
||||
}
|
||||
if (!serial_copy || !serial_copy->isOpen()) {
|
||||
@@ -250,7 +257,7 @@ struct CPStreamCore::Impl {
|
||||
stop_requested_.store(false, std::memory_order_release);
|
||||
|
||||
{
|
||||
std::lock_guard lock(packet_mutex_);
|
||||
std::lock_guard<std::mutex> lock(packet_mutex_);
|
||||
packet_queue_.clear();
|
||||
}
|
||||
|
||||
@@ -260,7 +267,7 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
bool is_open() const {
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
return serial_ && serial_->isOpen();
|
||||
}
|
||||
|
||||
@@ -279,7 +286,7 @@ struct CPStreamCore::Impl {
|
||||
|
||||
std::shared_ptr<serial::Serial> serial_copy;
|
||||
{
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
serial_copy = serial_;
|
||||
}
|
||||
|
||||
@@ -301,17 +308,17 @@ struct CPStreamCore::Impl {
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<DecodedFrame> try_pop_frame() {
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::optional<std::shared_ptr<DecodedFrame>> try_pop_frame() {
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
if (frame_queue_.empty()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
DecodedFrame frame = std::move(frame_queue_.front());
|
||||
std::shared_ptr<DecodedFrame> frame = std::move(frame_queue_.front());
|
||||
frame_queue_.pop_front();
|
||||
return frame;
|
||||
}
|
||||
|
||||
bool wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
|
||||
bool wait_for_frame(std::shared_ptr<DecodedFrame>& frame, std::chrono::milliseconds timeout) {
|
||||
std::unique_lock lock(frame_mutex_);
|
||||
if (!frame_cv_.wait_for(lock, timeout, [&] {
|
||||
return !frame_queue_.empty();
|
||||
@@ -324,7 +331,7 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
void clear_frames() {
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
frame_queue_.clear();
|
||||
}
|
||||
|
||||
@@ -333,7 +340,7 @@ struct CPStreamCore::Impl {
|
||||
capacity = 1U;
|
||||
}
|
||||
{
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
frame_queue_capacity_ = capacity;
|
||||
config_.frame_queue_capacity = capacity;
|
||||
while (frame_queue_.size() > frame_queue_capacity_) {
|
||||
@@ -342,8 +349,46 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
}
|
||||
|
||||
void clear_recorded_frames() {
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
frame_record_queue_.clear();
|
||||
}
|
||||
|
||||
std::size_t recorded_frame_count() const {
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
return frame_record_queue_.size();
|
||||
}
|
||||
|
||||
std::future<persist::WriteResult> export_recorded_frames(const std::string& path, bool clear_after_export) {
|
||||
if (!frame_writer_) {
|
||||
frame_writer_ = std::make_unique<persist::JsonWritter>();
|
||||
}
|
||||
|
||||
std::deque<std::shared_ptr<DecodedFrame>> snapshot;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
snapshot = frame_record_queue_;
|
||||
if (clear_after_export) {
|
||||
frame_record_queue_.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (snapshot.empty()) {
|
||||
std::promise<persist::WriteResult> promise;
|
||||
auto future = promise.get_future();
|
||||
promise.set_value(persist::WriteResult{
|
||||
false,
|
||||
"no recorded frames available",
|
||||
path
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
return frame_writer_->enqueue(path, std::move(snapshot));
|
||||
}
|
||||
|
||||
void set_frame_callback(FrameCallback callback) {
|
||||
std::lock_guard lock(callback_mutex_);
|
||||
std::lock_guard<std::mutex> lock(callback_mutex_);
|
||||
frame_callback_ = std::move(callback);
|
||||
}
|
||||
|
||||
@@ -352,7 +397,7 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
std::string last_error() const {
|
||||
std::lock_guard lock(last_error_mutex_);
|
||||
std::lock_guard<std::mutex> lock(last_error_mutex_);
|
||||
return last_error_;
|
||||
}
|
||||
|
||||
@@ -366,7 +411,7 @@ struct CPStreamCore::Impl {
|
||||
while (!stop_requested_.load(std::memory_order_acquire)) {
|
||||
std::shared_ptr<serial::Serial> serial_copy;
|
||||
{
|
||||
std::lock_guard lock(serial_mutex_);
|
||||
std::lock_guard<std::mutex> lock(serial_mutex_);
|
||||
serial_copy = serial_;
|
||||
}
|
||||
if (!serial_copy || !serial_copy->isOpen()) {
|
||||
@@ -401,7 +446,7 @@ struct CPStreamCore::Impl {
|
||||
packet.pts = pts_counter_.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
{
|
||||
std::lock_guard lock(packet_mutex_);
|
||||
std::lock_guard<std::mutex> lock(packet_mutex_);
|
||||
if (packet_queue_.size() >= config_.packet_queue_capacity) {
|
||||
packet_queue_.pop_front();
|
||||
}
|
||||
@@ -415,7 +460,7 @@ struct CPStreamCore::Impl {
|
||||
const auto command = config_.slave_request_command;
|
||||
auto interval = config_.slave_request_interval;
|
||||
if (interval.count() < 0) {
|
||||
interval = std::chrono::milliseconds{0};
|
||||
interval = 0ms;
|
||||
}
|
||||
const bool repeat = interval.count() > 0;
|
||||
|
||||
@@ -485,23 +530,24 @@ struct CPStreamCore::Impl {
|
||||
CPFrame frame;
|
||||
rc = cpcodec_receive_frame(codec_ctx_, &frame);
|
||||
if (rc == CP_SUCCESS) {
|
||||
DecodedFrame decoded;
|
||||
decoded.pts = frame.pts;
|
||||
decoded.received_at = std::chrono::steady_clock::now();
|
||||
decoded.frame = std::move(frame);
|
||||
if (codec_descriptor_ && codec_descriptor_->id == CPCodecID::Tactile) {
|
||||
if (auto parsed = tactile::parse_frame(decoded.frame)) {
|
||||
decoded.tactile = parsed;
|
||||
decoded.tactile_pressures = tactile::parse_pressure_values(*parsed);
|
||||
auto decoded = std::make_shared<DecodedFrame>();
|
||||
decoded->pts = frame.pts;
|
||||
decoded->received_at = std::chrono::steady_clock::now();
|
||||
decoded->frame = std::move(frame);
|
||||
decoded->id = codec_descriptor_ ? codec_descriptor_->id : CPCodecID::Unknow;
|
||||
if (decoded->id == CPCodecID::Tactile) {
|
||||
if (auto parsed = tactile::parse_frame(decoded->frame)) {
|
||||
decoded->tactile = parsed;
|
||||
decoded->tactile_pressures = tactile::parse_pressure_values(*parsed);
|
||||
if (auto matrix = tactile::parse_matrix_size_payload(*parsed)) {
|
||||
decoded.tactile_matrix_size = matrix;
|
||||
decoded->tactile_matrix_size = matrix;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FrameCallback callback_copy;
|
||||
{
|
||||
std::lock_guard lock(callback_mutex_);
|
||||
std::lock_guard<std::mutex> lock(callback_mutex_);
|
||||
callback_copy = frame_callback_;
|
||||
}
|
||||
if (callback_copy) {
|
||||
@@ -509,11 +555,14 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(frame_mutex_);
|
||||
std::lock_guard<std::mutex> lock(frame_mutex_);
|
||||
if (frame_queue_.size() >= frame_queue_capacity_) {
|
||||
frame_queue_.pop_front();
|
||||
}
|
||||
frame_queue_.push_back(std::move(decoded));
|
||||
frame_queue_.push_back(decoded);
|
||||
if (decoded->id == CPCodecID::Tactile) {
|
||||
frame_record_queue_.push_back(decoded);
|
||||
}
|
||||
}
|
||||
frame_cv_.notify_one();
|
||||
} else if (rc == CP_ERROR_EAGAIN) {
|
||||
@@ -537,7 +586,7 @@ struct CPStreamCore::Impl {
|
||||
packet.flush = true;
|
||||
packet.end_of_stream = end_of_stream;
|
||||
{
|
||||
std::lock_guard lock(packet_mutex_);
|
||||
std::lock_guard<std::mutex> lock(packet_mutex_);
|
||||
packet_queue_.push_back(std::move(packet));
|
||||
}
|
||||
packet_cv_.notify_one();
|
||||
@@ -555,7 +604,7 @@ struct CPStreamCore::Impl {
|
||||
}
|
||||
|
||||
void set_last_error(std::string message) {
|
||||
std::lock_guard lock(last_error_mutex_);
|
||||
std::lock_guard<std::mutex> lock(last_error_mutex_);
|
||||
last_error_ = std::move(message);
|
||||
}
|
||||
|
||||
@@ -575,9 +624,12 @@ struct CPStreamCore::Impl {
|
||||
std::condition_variable packet_cv_;
|
||||
std::deque<Packet> packet_queue_;
|
||||
|
||||
std::mutex frame_mutex_;
|
||||
mutable std::mutex frame_mutex_;
|
||||
std::condition_variable frame_cv_;
|
||||
std::deque<DecodedFrame> frame_queue_;
|
||||
// std::deque<DecodedFrame> frame_queue_;
|
||||
// 更新为智能指针,我们需要更长的生命周期😊
|
||||
std::deque<std::shared_ptr<DecodedFrame>> frame_queue_;
|
||||
std::deque<std::shared_ptr<DecodedFrame>> frame_record_queue_;
|
||||
std::size_t frame_queue_capacity_ = 16;
|
||||
|
||||
FrameCallback frame_callback_;
|
||||
@@ -589,6 +641,8 @@ struct CPStreamCore::Impl {
|
||||
|
||||
std::string last_error_;
|
||||
mutable std::mutex last_error_mutex_;
|
||||
|
||||
std::unique_ptr<persist::JsonWritter> frame_writer_;
|
||||
};
|
||||
|
||||
CPStreamCore::CPStreamCore(CPStreamConfig config)
|
||||
@@ -641,11 +695,11 @@ bool CPStreamCore::send(const std::uint8_t* data, std::size_t size) {
|
||||
return impl_->send(data, size);
|
||||
}
|
||||
|
||||
std::optional<DecodedFrame> CPStreamCore::try_pop_frame() {
|
||||
std::optional<std::shared_ptr<DecodedFrame>> CPStreamCore::try_pop_frame() {
|
||||
return impl_->try_pop_frame();
|
||||
}
|
||||
|
||||
bool CPStreamCore::wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
|
||||
bool CPStreamCore::wait_for_frame(std::shared_ptr<DecodedFrame>& frame, std::chrono::milliseconds timeout) {
|
||||
return impl_->wait_for_frame(frame, timeout);
|
||||
}
|
||||
|
||||
@@ -657,6 +711,18 @@ void CPStreamCore::set_frame_queue_capacity(std::size_t capacity) {
|
||||
impl_->set_frame_queue_capacity(capacity);
|
||||
}
|
||||
|
||||
void CPStreamCore::clear_recorded_frames() {
|
||||
impl_->clear_recorded_frames();
|
||||
}
|
||||
|
||||
std::size_t CPStreamCore::recorded_frame_count() const {
|
||||
return impl_->recorded_frame_count();
|
||||
}
|
||||
|
||||
std::future<persist::WriteResult> CPStreamCore::export_recorded_frames(const std::string& path, bool clear_after_export) {
|
||||
return impl_->export_recorded_frames(path, clear_after_export);
|
||||
}
|
||||
|
||||
void CPStreamCore::set_frame_callback(FrameCallback callback) {
|
||||
impl_->set_frame_callback(std::move(callback));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user