676 lines
19 KiB
C++
676 lines
19 KiB
C++
#include "components/ffmsep/cpstream_core.hh"
|
|
|
|
#include <algorithm>
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <deque>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <optional>
|
|
#include <thread>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
namespace ffmsep {
|
|
|
|
namespace {
|
|
|
|
constexpr auto kReaderIdleSleep = std::chrono::milliseconds(5);
|
|
constexpr auto kDecoderIdleSleep = std::chrono::milliseconds(1);
|
|
|
|
const CPCodec* resolve_requested_codec(const CPStreamConfig& config) {
|
|
if (!config.codec_name.empty()) {
|
|
if (const CPCodec* codec = cpcodec_find_decoder_by_name(config.codec_name)) {
|
|
return codec;
|
|
}
|
|
}
|
|
if (config.codec_id != CPCodecID::Unknow) {
|
|
if (const CPCodec* codec = cpcodec_find_decoder(config.codec_id)) {
|
|
return codec;
|
|
}
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
struct CPStreamCore::Impl {
|
|
struct Packet {
|
|
std::vector<std::uint8_t> payload;
|
|
std::int64_t pts = 0;
|
|
bool end_of_stream = false;
|
|
bool flush = false;
|
|
};
|
|
|
|
explicit Impl(CPStreamConfig config)
|
|
: config_(std::move(config)) {
|
|
normalize_config();
|
|
}
|
|
|
|
~Impl() = default;
|
|
|
|
void normalize_config() {
|
|
if (config_.read_chunk_size == 0U) {
|
|
config_.read_chunk_size = 256U;
|
|
}
|
|
if (config_.packet_queue_capacity == 0U) {
|
|
config_.packet_queue_capacity = 1U;
|
|
}
|
|
if (config_.frame_queue_capacity == 0U) {
|
|
config_.frame_queue_capacity = 1U;
|
|
}
|
|
if (config_.slave_request_interval.count() < 0) {
|
|
config_.slave_request_interval = std::chrono::milliseconds{0};
|
|
}
|
|
frame_queue_capacity_ = config_.frame_queue_capacity;
|
|
}
|
|
|
|
bool open(const CPStreamConfig& cfg) {
|
|
stop();
|
|
close();
|
|
|
|
config_ = cfg;
|
|
normalize_config();
|
|
|
|
if (config_.port.empty()) {
|
|
set_last_error("serial port is empty");
|
|
return false;
|
|
}
|
|
|
|
codec_descriptor_ = resolve_requested_codec(config_);
|
|
if (!codec_descriptor_) {
|
|
set_last_error("codec not found for requested identifier");
|
|
return false;
|
|
}
|
|
|
|
codec_ctx_ = cpcodec_alloc_context(codec_descriptor_);
|
|
if (!codec_ctx_) {
|
|
set_last_error("failed to allocate codec context");
|
|
return false;
|
|
}
|
|
|
|
int rc = cpcodec_open(codec_ctx_, codec_descriptor_);
|
|
if (rc < CP_SUCCESS) {
|
|
set_last_error("failed to open codec context: error " + std::to_string(rc));
|
|
cpcodec_free_context(&codec_ctx_);
|
|
codec_ctx_ = nullptr;
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
auto serial = std::make_shared<serial::Serial>(
|
|
config_.port,
|
|
config_.baudrate,
|
|
config_.timeout,
|
|
config_.bytesize,
|
|
config_.parity,
|
|
config_.stopbits,
|
|
config_.flowcontrol);
|
|
if (!serial->isOpen()) {
|
|
serial->open();
|
|
}
|
|
serial->flush();
|
|
|
|
{
|
|
std::lock_guard lock(serial_mutex_);
|
|
serial_ = std::move(serial);
|
|
}
|
|
} catch (const serial::IOException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial IO exception");
|
|
cpcodec_close(codec_ctx_);
|
|
cpcodec_free_context(&codec_ctx_);
|
|
codec_ctx_ = nullptr;
|
|
return false;
|
|
} catch (const serial::SerialException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial exception");
|
|
cpcodec_close(codec_ctx_);
|
|
cpcodec_free_context(&codec_ctx_);
|
|
codec_ctx_ = nullptr;
|
|
return false;
|
|
} catch (const std::exception& ex) {
|
|
set_last_error(ex.what());
|
|
cpcodec_close(codec_ctx_);
|
|
cpcodec_free_context(&codec_ctx_);
|
|
codec_ctx_ = nullptr;
|
|
return false;
|
|
}
|
|
|
|
{
|
|
std::lock_guard lock(packet_mutex_);
|
|
packet_queue_.clear();
|
|
}
|
|
{
|
|
std::lock_guard lock(frame_mutex_);
|
|
frame_queue_.clear();
|
|
}
|
|
pts_counter_.store(0, std::memory_order_relaxed);
|
|
stop_requested_.store(false, std::memory_order_release);
|
|
set_last_error({});
|
|
return true;
|
|
}
|
|
|
|
bool open() {
|
|
return open(config_);
|
|
}
|
|
|
|
bool reopen(const CPStreamConfig& cfg) {
|
|
return open(cfg);
|
|
}
|
|
|
|
void close() {
|
|
stop();
|
|
|
|
{
|
|
std::lock_guard lock(serial_mutex_);
|
|
if (serial_) {
|
|
try {
|
|
if (serial_->isOpen()) {
|
|
serial_->close();
|
|
}
|
|
} catch (...) {
|
|
// Ignore close errors.
|
|
}
|
|
serial_.reset();
|
|
}
|
|
}
|
|
|
|
if (codec_ctx_) {
|
|
cpcodec_close(codec_ctx_);
|
|
cpcodec_free_context(&codec_ctx_);
|
|
codec_ctx_ = nullptr;
|
|
}
|
|
|
|
{
|
|
std::lock_guard lock(packet_mutex_);
|
|
packet_queue_.clear();
|
|
}
|
|
{
|
|
std::lock_guard lock(frame_mutex_);
|
|
frame_queue_.clear();
|
|
}
|
|
}
|
|
|
|
bool start() {
|
|
if (running_.load(std::memory_order_acquire)) {
|
|
return true;
|
|
}
|
|
|
|
std::shared_ptr<serial::Serial> serial_copy;
|
|
{
|
|
std::lock_guard lock(serial_mutex_);
|
|
serial_copy = serial_;
|
|
}
|
|
if (!serial_copy || !serial_copy->isOpen()) {
|
|
set_last_error("serial port is not open");
|
|
return false;
|
|
}
|
|
|
|
if (!codec_ctx_ || !codec_ctx_->is_open) {
|
|
set_last_error("codec context is not ready");
|
|
return false;
|
|
}
|
|
|
|
stop_requested_.store(false, std::memory_order_release);
|
|
running_.store(true, std::memory_order_release);
|
|
|
|
reader_thread_ = std::thread(&Impl::reader_loop, this);
|
|
decoder_thread_ = std::thread(&Impl::decoder_loop, this);
|
|
if (!config_.slave_request_command.empty()) {
|
|
slave_thread_ = std::thread(&Impl::slave_loop, this);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void stop() {
|
|
if (!running_.exchange(false, std::memory_order_acq_rel)) {
|
|
return;
|
|
}
|
|
|
|
stop_requested_.store(true, std::memory_order_release);
|
|
packet_cv_.notify_all();
|
|
|
|
if (reader_thread_.joinable()) {
|
|
reader_thread_.join();
|
|
}
|
|
if (slave_thread_.joinable()) {
|
|
slave_thread_.join();
|
|
}
|
|
|
|
signal_decoder_flush(true);
|
|
packet_cv_.notify_all();
|
|
|
|
if (decoder_thread_.joinable()) {
|
|
decoder_thread_.join();
|
|
}
|
|
|
|
stop_requested_.store(false, std::memory_order_release);
|
|
|
|
{
|
|
std::lock_guard lock(packet_mutex_);
|
|
packet_queue_.clear();
|
|
}
|
|
|
|
if (codec_ctx_ && codec_ctx_->is_open) {
|
|
reset_decoder();
|
|
}
|
|
}
|
|
|
|
bool is_open() const {
|
|
std::lock_guard lock(serial_mutex_);
|
|
return serial_ && serial_->isOpen();
|
|
}
|
|
|
|
bool is_running() const {
|
|
return running_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
bool send(const std::vector<std::uint8_t>& data) {
|
|
return send(data.data(), data.size());
|
|
}
|
|
|
|
bool send(const std::uint8_t* data, std::size_t size) {
|
|
if (!data || size == 0U) {
|
|
return false;
|
|
}
|
|
|
|
std::shared_ptr<serial::Serial> serial_copy;
|
|
{
|
|
std::lock_guard lock(serial_mutex_);
|
|
serial_copy = serial_;
|
|
}
|
|
|
|
if (!serial_copy || !serial_copy->isOpen()) {
|
|
set_last_error("serial port is not open");
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
const auto written = serial_copy->write(data, size);
|
|
return written == size;
|
|
} catch (const serial::IOException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial IO exception");
|
|
} catch (const serial::SerialException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial exception");
|
|
} catch (const std::exception& ex) {
|
|
set_last_error(ex.what());
|
|
}
|
|
return false;
|
|
}
|
|
|
|
std::optional<DecodedFrame> try_pop_frame() {
|
|
std::lock_guard lock(frame_mutex_);
|
|
if (frame_queue_.empty()) {
|
|
return std::nullopt;
|
|
}
|
|
DecodedFrame frame = std::move(frame_queue_.front());
|
|
frame_queue_.pop_front();
|
|
return frame;
|
|
}
|
|
|
|
bool wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
|
|
std::unique_lock lock(frame_mutex_);
|
|
if (!frame_cv_.wait_for(lock, timeout, [&] {
|
|
return !frame_queue_.empty();
|
|
})) {
|
|
return false;
|
|
}
|
|
frame = std::move(frame_queue_.front());
|
|
frame_queue_.pop_front();
|
|
return true;
|
|
}
|
|
|
|
void clear_frames() {
|
|
std::lock_guard lock(frame_mutex_);
|
|
frame_queue_.clear();
|
|
}
|
|
|
|
void set_frame_queue_capacity(std::size_t capacity) {
|
|
if (capacity == 0U) {
|
|
capacity = 1U;
|
|
}
|
|
{
|
|
std::lock_guard lock(frame_mutex_);
|
|
frame_queue_capacity_ = capacity;
|
|
config_.frame_queue_capacity = capacity;
|
|
while (frame_queue_.size() > frame_queue_capacity_) {
|
|
frame_queue_.pop_front();
|
|
}
|
|
}
|
|
}
|
|
|
|
void set_frame_callback(FrameCallback callback) {
|
|
std::lock_guard lock(callback_mutex_);
|
|
frame_callback_ = std::move(callback);
|
|
}
|
|
|
|
CPStreamConfig config() const {
|
|
return config_;
|
|
}
|
|
|
|
std::string last_error() const {
|
|
std::lock_guard lock(last_error_mutex_);
|
|
return last_error_;
|
|
}
|
|
|
|
static std::vector<serial::PortInfo> list_ports() {
|
|
return serial::list_ports();
|
|
}
|
|
|
|
void reader_loop() {
|
|
std::vector<std::uint8_t> buffer(config_.read_chunk_size);
|
|
|
|
while (!stop_requested_.load(std::memory_order_acquire)) {
|
|
std::shared_ptr<serial::Serial> serial_copy;
|
|
{
|
|
std::lock_guard lock(serial_mutex_);
|
|
serial_copy = serial_;
|
|
}
|
|
if (!serial_copy || !serial_copy->isOpen()) {
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
}
|
|
|
|
std::size_t bytes_read = 0;
|
|
try {
|
|
bytes_read = serial_copy->read(buffer.data(), buffer.size());
|
|
} catch (const serial::IOException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial IO exception");
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
} catch (const serial::SerialException& ex) {
|
|
set_last_error(ex.what() ? ex.what() : "serial exception");
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
} catch (const std::exception& ex) {
|
|
set_last_error(ex.what());
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
}
|
|
|
|
if (bytes_read == 0U) {
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
}
|
|
|
|
Packet packet;
|
|
packet.payload.assign(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(bytes_read));
|
|
packet.pts = pts_counter_.fetch_add(1, std::memory_order_relaxed);
|
|
|
|
{
|
|
std::lock_guard lock(packet_mutex_);
|
|
if (packet_queue_.size() >= config_.packet_queue_capacity) {
|
|
packet_queue_.pop_front();
|
|
}
|
|
packet_queue_.push_back(std::move(packet));
|
|
}
|
|
packet_cv_.notify_one();
|
|
}
|
|
}
|
|
|
|
void slave_loop() {
|
|
const auto command = config_.slave_request_command;
|
|
auto interval = config_.slave_request_interval;
|
|
if (interval.count() < 0) {
|
|
interval = std::chrono::milliseconds{0};
|
|
}
|
|
const bool repeat = interval.count() > 0;
|
|
|
|
while (!stop_requested_.load(std::memory_order_acquire)) {
|
|
const bool success = send(command);
|
|
if (!success) {
|
|
std::this_thread::sleep_for(kReaderIdleSleep);
|
|
continue;
|
|
}
|
|
if (!repeat) {
|
|
break;
|
|
}
|
|
|
|
auto remaining = interval;
|
|
while (remaining.count() > 0 && !stop_requested_.load(std::memory_order_acquire)) {
|
|
const auto step = std::min(remaining, kReaderIdleSleep);
|
|
std::this_thread::sleep_for(step);
|
|
remaining -= step;
|
|
}
|
|
}
|
|
}
|
|
|
|
void decoder_loop() {
|
|
while (true) {
|
|
Packet packet;
|
|
{
|
|
std::unique_lock lock(packet_mutex_);
|
|
packet_cv_.wait(lock, [&] {
|
|
return stop_requested_.load(std::memory_order_acquire) || !packet_queue_.empty();
|
|
});
|
|
|
|
if (packet_queue_.empty()) {
|
|
if (stop_requested_.load(std::memory_order_acquire)) {
|
|
break;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
packet = std::move(packet_queue_.front());
|
|
packet_queue_.pop_front();
|
|
}
|
|
|
|
if (!codec_ctx_ || !codec_ctx_->is_open) {
|
|
if (packet.end_of_stream) {
|
|
break;
|
|
}
|
|
std::this_thread::sleep_for(kDecoderIdleSleep);
|
|
continue;
|
|
}
|
|
|
|
CPPacket cp_packet;
|
|
cp_packet.payload = std::move(packet.payload);
|
|
cp_packet.pts = packet.pts;
|
|
cp_packet.dts = packet.pts;
|
|
cp_packet.end_of_stream = packet.end_of_stream;
|
|
cp_packet.flush = packet.flush;
|
|
|
|
int rc = cpcodec_send_packet(codec_ctx_, &cp_packet);
|
|
if (rc < CP_SUCCESS) {
|
|
if (packet.end_of_stream) {
|
|
break;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
while (true) {
|
|
CPFrame frame;
|
|
rc = cpcodec_receive_frame(codec_ctx_, &frame);
|
|
if (rc == CP_SUCCESS) {
|
|
DecodedFrame decoded;
|
|
decoded.pts = frame.pts;
|
|
decoded.received_at = std::chrono::steady_clock::now();
|
|
decoded.frame = std::move(frame);
|
|
if (codec_descriptor_ && codec_descriptor_->id == CPCodecID::Tactile) {
|
|
if (auto parsed = tactile::parse_frame(decoded.frame)) {
|
|
decoded.tactile = parsed;
|
|
decoded.tactile_pressures = tactile::parse_pressure_values(*parsed);
|
|
if (auto matrix = tactile::parse_matrix_size_payload(*parsed)) {
|
|
decoded.tactile_matrix_size = matrix;
|
|
}
|
|
}
|
|
}
|
|
|
|
FrameCallback callback_copy;
|
|
{
|
|
std::lock_guard lock(callback_mutex_);
|
|
callback_copy = frame_callback_;
|
|
}
|
|
if (callback_copy) {
|
|
callback_copy(decoded);
|
|
}
|
|
|
|
{
|
|
std::lock_guard lock(frame_mutex_);
|
|
if (frame_queue_.size() >= frame_queue_capacity_) {
|
|
frame_queue_.pop_front();
|
|
}
|
|
frame_queue_.push_back(std::move(decoded));
|
|
}
|
|
frame_cv_.notify_one();
|
|
} else if (rc == CP_ERROR_EAGAIN) {
|
|
break;
|
|
} else {
|
|
if (rc == CP_ERROR_EOF && packet.end_of_stream) {
|
|
return;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (packet.end_of_stream) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void signal_decoder_flush(bool end_of_stream) {
|
|
Packet packet;
|
|
packet.flush = true;
|
|
packet.end_of_stream = end_of_stream;
|
|
{
|
|
std::lock_guard lock(packet_mutex_);
|
|
packet_queue_.push_back(std::move(packet));
|
|
}
|
|
packet_cv_.notify_one();
|
|
}
|
|
|
|
void reset_decoder() {
|
|
if (!codec_ctx_ || !codec_descriptor_) {
|
|
return;
|
|
}
|
|
cpcodec_close(codec_ctx_);
|
|
int rc = cpcodec_open(codec_ctx_, codec_descriptor_);
|
|
if (rc < CP_SUCCESS) {
|
|
set_last_error("failed to reset codec context: error " + std::to_string(rc));
|
|
}
|
|
}
|
|
|
|
void set_last_error(std::string message) {
|
|
std::lock_guard lock(last_error_mutex_);
|
|
last_error_ = std::move(message);
|
|
}
|
|
|
|
CPStreamConfig config_{};
|
|
const CPCodec* codec_descriptor_ = nullptr;
|
|
|
|
std::shared_ptr<serial::Serial> serial_;
|
|
mutable std::mutex serial_mutex_;
|
|
|
|
CPCodecContext* codec_ctx_ = nullptr;
|
|
|
|
std::thread reader_thread_;
|
|
std::thread slave_thread_;
|
|
std::thread decoder_thread_;
|
|
|
|
std::mutex packet_mutex_;
|
|
std::condition_variable packet_cv_;
|
|
std::deque<Packet> packet_queue_;
|
|
|
|
std::mutex frame_mutex_;
|
|
std::condition_variable frame_cv_;
|
|
std::deque<DecodedFrame> frame_queue_;
|
|
std::size_t frame_queue_capacity_ = 16;
|
|
|
|
FrameCallback frame_callback_;
|
|
mutable std::mutex callback_mutex_;
|
|
|
|
std::atomic<bool> running_{false};
|
|
std::atomic<bool> stop_requested_{false};
|
|
std::atomic<std::int64_t> pts_counter_{0};
|
|
|
|
std::string last_error_;
|
|
mutable std::mutex last_error_mutex_;
|
|
};
|
|
|
|
CPStreamCore::CPStreamCore(CPStreamConfig config)
|
|
: impl_(std::make_unique<Impl>(std::move(config))) {}
|
|
|
|
CPStreamCore::~CPStreamCore() {
|
|
if (impl_) {
|
|
impl_->stop();
|
|
impl_->close();
|
|
}
|
|
}
|
|
|
|
bool CPStreamCore::open(const CPStreamConfig& config) {
|
|
return impl_->open(config);
|
|
}
|
|
|
|
bool CPStreamCore::open() {
|
|
return impl_->open();
|
|
}
|
|
|
|
bool CPStreamCore::reopen(const CPStreamConfig& config) {
|
|
return impl_->reopen(config);
|
|
}
|
|
|
|
void CPStreamCore::close() {
|
|
impl_->close();
|
|
}
|
|
|
|
bool CPStreamCore::start() {
|
|
return impl_->start();
|
|
}
|
|
|
|
void CPStreamCore::stop() {
|
|
impl_->stop();
|
|
}
|
|
|
|
bool CPStreamCore::is_open() const noexcept {
|
|
return impl_->is_open();
|
|
}
|
|
|
|
bool CPStreamCore::is_running() const noexcept {
|
|
return impl_->is_running();
|
|
}
|
|
|
|
bool CPStreamCore::send(const std::vector<std::uint8_t>& data) {
|
|
return impl_->send(data);
|
|
}
|
|
|
|
bool CPStreamCore::send(const std::uint8_t* data, std::size_t size) {
|
|
return impl_->send(data, size);
|
|
}
|
|
|
|
std::optional<DecodedFrame> CPStreamCore::try_pop_frame() {
|
|
return impl_->try_pop_frame();
|
|
}
|
|
|
|
bool CPStreamCore::wait_for_frame(DecodedFrame& frame, std::chrono::milliseconds timeout) {
|
|
return impl_->wait_for_frame(frame, timeout);
|
|
}
|
|
|
|
void CPStreamCore::clear_frames() {
|
|
impl_->clear_frames();
|
|
}
|
|
|
|
void CPStreamCore::set_frame_queue_capacity(std::size_t capacity) {
|
|
impl_->set_frame_queue_capacity(capacity);
|
|
}
|
|
|
|
void CPStreamCore::set_frame_callback(FrameCallback callback) {
|
|
impl_->set_frame_callback(std::move(callback));
|
|
}
|
|
|
|
CPStreamConfig CPStreamCore::config() const {
|
|
return impl_->config();
|
|
}
|
|
|
|
std::string CPStreamCore::last_error() const {
|
|
return impl_->last_error();
|
|
}
|
|
|
|
std::vector<serial::PortInfo> CPStreamCore::list_available_ports() {
|
|
return Impl::list_ports();
|
|
}
|
|
|
|
} // namespace ffmsep
|