Files
ts-qt/components/ffmsep/presist/presist.cc

315 lines
9.1 KiB
C++

//
// Created by Lenn on 2025/10/31.
//
#include "components/ffmsep/presist/presist.hh"
#include "components/ffmsep/cpstream_core.hh"
#include <algorithm>
#include <chrono>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <nlohmann/json_fwd.hpp>
#include <sstream>
#include <system_error>
#include <nlohmann/json.hpp>
namespace ffmsep::persist {
namespace {
using nlohmann::json;
// 旧的 JSON 导出实现保留在此,避免直接删除,便于回退。
// bool is_simple_array(const json& value) { ... }
// void dump_compact_json(...)
// json serialize_tactile_frame(const DecodedFrame& frame) { ... }
std::string payload_to_csv_row(const std::string& timestamp,
const std::vector<std::uint8_t>& payload) {
// First column: receive local time (YYYYMMDDHHMMSS). Then payload every 2 bytes -> uint16.
std::ostringstream oss;
oss << timestamp;
bool first = false; // timestamp already placed
for (std::size_t idx = 0; idx + 1U < payload.size(); idx += 2U) {
const auto value =
static_cast<std::uint16_t>(payload[idx]) | static_cast<std::uint16_t>(payload[idx + 1U] << 8U);
if (!first) {
oss << ',';
}
first = false;
oss << value;
}
return oss.str();
}
} // namespace
namespace {
using nlohmann::json;
std::string format_receive_time(const std::chrono::steady_clock::time_point& received) {
// Map steady_clock timestamp to system_clock using the current offset, then format as YYYYMMDDHHMMSS.
const auto now_sys = std::chrono::system_clock::now();
const auto now_steady = std::chrono::steady_clock::now();
const auto steady_delta = received - now_steady;
const auto sys_tp = now_sys + steady_delta;
const auto sys_ms_tp = std::chrono::time_point_cast<std::chrono::milliseconds>(sys_tp);
const auto ms_part = std::chrono::duration_cast<std::chrono::milliseconds>(sys_ms_tp.time_since_epoch()) % 1000;
const auto sys_sec_tp = std::chrono::time_point_cast<std::chrono::seconds>(sys_ms_tp);
const std::time_t tt = std::chrono::system_clock::to_time_t(sys_sec_tp);
std::tm tm{};
#if defined(_WIN32)
localtime_s(&tm, &tt);
#else
localtime_r(&tt, &tm);
#endif
std::ostringstream oss;
oss << std::put_time(&tm, "%Y%m%d%H%M%S")
<< std::setw(3) << std::setfill('0') << ms_part.count();
return oss.str();
}
bool is_simple_array(const json& value) {
if (!value.is_array()) {
return false;
}
return std::all_of(value.begin(), value.end(), [](const json& item) {
return item.is_primitive();
});
}
void dump_compact_json(std::ostream& out,
const json& value,
int indent = 0,
int indent_step = 2) {
const auto indent_str = std::string(static_cast<std::size_t>(indent), ' ');
const auto child_indent = indent + indent_step;
const auto child_indent_str = std::string(static_cast<std::size_t>(child_indent), ' ');
if (value.is_object()) {
out << "{\n";
bool first = true;
for (auto it = value.begin(); it != value.end(); ++it) {
if (!first) {
out << ",\n";
}
first = false;
out << child_indent_str << json(it.key()).dump() << ": ";
dump_compact_json(out, it.value(), child_indent, indent_step);
}
out << '\n'
<< indent_str << '}';
return;
}
if (value.is_array()) {
if (value.empty()) {
out << "[]";
return;
}
if (is_simple_array(value)) {
out << '[';
for (std::size_t idx = 0; idx < value.size(); ++idx) {
if (idx != 0U) {
out << ", ";
}
out << value[static_cast<json::size_type>(idx)].dump();
}
out << ']';
return;
}
out << "[\n";
bool first = true;
for (const auto& item: value) {
if (!first) {
out << ",\n";
}
first = false;
out << child_indent_str;
dump_compact_json(out, item, child_indent, indent_step);
}
out << '\n'
<< indent_str << ']';
return;
}
out << value.dump();
}
json serialize_tactile_frame(const DecodedFrame& frame) {
json result = {
{ "pts", frame.pts },
{ "raw_frame", frame.frame.data },
{ "pressures", frame.tactile_pressures },
};
const auto received = frame.received_at.time_since_epoch();
result["received_at_ns"] =
std::chrono::duration_cast<std::chrono::nanoseconds>(received).count();
if (frame.tactile_matrix_size) {
result["matrix"] = {
{ "long_edge", frame.tactile_matrix_size->long_edge },
{ "short_edge", frame.tactile_matrix_size->short_edge },
};
}
if (frame.tactile) {
const auto& tactile = *frame.tactile;
result["tactile"] = {
{ "device_address", tactile.device_address },
{ "response_function", tactile.response_function },
{ "function", static_cast<std::uint8_t>(tactile.function) },
{ "start_address", tactile.start_address },
{ "return_byte_count", tactile.return_byte_count },
{ "status", tactile.status },
{ "payload", tactile.payload },
};
}
return result;
}
} // namespace
bool WriteQueue::push(WriteRequest&& req) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (stopped_) {
return false;
}
queue_.push(std::move(req));
}
cond_.notify_one();
return true;
}
bool WriteQueue::pop(WriteRequest& out) {
std::unique_lock lock(mutex_);
cond_.wait(lock, [&] {
return stopped_ || !queue_.empty();
});
if (queue_.empty()) {
return false;
}
out = std::move(queue_.front());
queue_.pop();
return true;
}
void WriteQueue::stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
stopped_ = true;
}
cond_.notify_all();
}
JsonWritter::JsonWritter(): write_thread_([this] { run(); }) {}
JsonWritter::~JsonWritter() {
stop();
}
std::future<WriteResult> JsonWritter::enqueue(std::string path,
std::deque<std::shared_ptr<DecodedFrame>> frames) {
std::promise<WriteResult> promise;
auto future = promise.get_future();
WriteRequest request{ std::move(path), std::move(frames), std::move(promise) };
if (!write_queue_.push(std::move(request))) {
WriteResult result{ false, "writer has been stopped", request.path };
request.promise.set_value(std::move(result));
}
return future;
}
void JsonWritter::run() {
WriteRequest request;
while (write_queue_.pop(request)) {
try {
auto result = write_once(request.path, std::move(request.frames));
request.promise.set_value(std::move(result));
}
catch (const std::exception& ex) {
request.promise.set_value(WriteResult{ false, ex.what(), request.path });
}
catch (...) {
request.promise.set_value(WriteResult{ false, "unknown error", request.path });
}
}
}
WriteResult JsonWritter::write_once(const std::string& path,
std::deque<std::shared_ptr<DecodedFrame>> frames) {
if (path.empty()) {
return { false, "export path is empty", path };
}
std::filesystem::path fs_path(path);
if (fs_path.has_parent_path()) {
std::error_code ec;
std::filesystem::create_directories(fs_path.parent_path(), ec);
if (ec) {
return { false, "failed to create export directory: " + ec.message(), path };
}
}
std::ofstream stream(path, std::ios::binary | std::ios::trunc);
if (!stream.is_open()) {
return { false, "failed to open export file", path };
}
bool wrote_any = false;
for (const auto& frame: frames) {
if (!frame) {
continue;
}
std::vector<std::uint8_t> payload;
if (frame->id == CPCodecID::Tactile && frame->tactile) {
payload = frame->tactile->payload;
}
else if (frame->id == CPCodecID::PiezoresistiveB) {
payload = tactile::extract_piezoresistive_b_payload(frame->frame);
}
if (payload.empty()) {
continue;
}
const auto timestamp = format_receive_time(frame->received_at);
const auto row = payload_to_csv_row(timestamp, payload);
stream << row << '\n';
wrote_any = true;
}
stream.flush();
if (!stream.good()) {
return { false, "failed to write export file", path };
}
if (!wrote_any) {
return { false, "no tactile frames available for export", path };
}
return { true, {}, path };
}
void JsonWritter::stop() {
if (!stopped_.exchange(true)) {
write_queue_.stop();
if (write_thread_.joinable()) {
write_thread_.join();
}
}
}
} // namespace ffmsep::persist