// // Created by Lenn on 2025/10/31. // #include "components/ffmsep/presist/presist.hh" #include "components/ffmsep/cpstream_core.hh" #include #include #include #include #include #include #include namespace ffmsep::persist { namespace { using nlohmann::json; bool is_simple_array(const json& value) { if (!value.is_array()) { return false; } return std::all_of(value.begin(), value.end(), [](const json& item) { return item.is_primitive(); }); } void dump_compact_json(std::ostream& out, const json& value, int indent = 0, int indent_step = 2) { const auto indent_str = std::string(static_cast(indent), ' '); const auto child_indent = indent + indent_step; const auto child_indent_str = std::string(static_cast(child_indent), ' '); if (value.is_object()) { out << "{\n"; bool first = true; for (auto it = value.begin(); it != value.end(); ++it) { if (!first) { out << ",\n"; } first = false; out << child_indent_str << json(it.key()).dump() << ": "; dump_compact_json(out, it.value(), child_indent, indent_step); } out << '\n' << indent_str << '}'; return; } if (value.is_array()) { if (value.empty()) { out << "[]"; return; } if (is_simple_array(value)) { out << '['; for (std::size_t idx = 0; idx < value.size(); ++idx) { if (idx != 0U) { out << ", "; } out << value[static_cast(idx)].dump(); } out << ']'; return; } out << "[\n"; bool first = true; for (const auto& item : value) { if (!first) { out << ",\n"; } first = false; out << child_indent_str; dump_compact_json(out, item, child_indent, indent_step); } out << '\n' << indent_str << ']'; return; } out << value.dump(); } json serialize_tactile_frame(const DecodedFrame& frame) { json result = { {"pts", frame.pts}, {"raw_frame", frame.frame.data}, {"pressures", frame.tactile_pressures}, }; const auto received = frame.received_at.time_since_epoch(); result["received_at_ns"] = std::chrono::duration_cast(received).count(); if (frame.tactile_matrix_size) { result["matrix"] = { {"long_edge", frame.tactile_matrix_size->long_edge}, {"short_edge", frame.tactile_matrix_size->short_edge}, }; } if (frame.tactile) { const auto& tactile = *frame.tactile; result["tactile"] = { {"device_address", tactile.device_address}, {"response_function", tactile.response_function}, {"function", static_cast(tactile.function)}, {"start_address", tactile.start_address}, {"return_byte_count", tactile.return_byte_count}, {"status", tactile.status}, {"payload", tactile.payload}, }; } return result; } } // namespace bool WriteQueue::push(WriteRequest&& req) { { std::lock_guard lock(mutex_); if (stopped_) { return false; } queue_.push(std::move(req)); } cond_.notify_one(); return true; } bool WriteQueue::pop(WriteRequest& out) { std::unique_lock lock(mutex_); cond_.wait(lock, [&] { return stopped_ || !queue_.empty(); }); if (queue_.empty()) { return false; } out = std::move(queue_.front()); queue_.pop(); return true; } void WriteQueue::stop() { { std::lock_guard lock(mutex_); stopped_ = true; } cond_.notify_all(); } JsonWritter::JsonWritter() : write_thread_([this] { run(); }) {} JsonWritter::~JsonWritter() { stop(); } std::future JsonWritter::enqueue(std::string path, std::deque> frames) { std::promise promise; auto future = promise.get_future(); WriteRequest request{std::move(path), std::move(frames), std::move(promise)}; if (!write_queue_.push(std::move(request))) { WriteResult result{false, "writer has been stopped", request.path}; request.promise.set_value(std::move(result)); } return future; } void JsonWritter::run() { WriteRequest request; while (write_queue_.pop(request)) { try { auto result = write_once(request.path, std::move(request.frames)); request.promise.set_value(std::move(result)); } catch (const std::exception& ex) { request.promise.set_value(WriteResult{false, ex.what(), request.path}); } catch (...) { request.promise.set_value(WriteResult{false, "unknown error", request.path}); } } } WriteResult JsonWritter::write_once(const std::string& path, std::deque> frames) { if (path.empty()) { return {false, "export path is empty", path}; } json tactile_frames = json::array(); for (const auto& frame : frames) { if (!frame) { continue; } if (frame->id != CPCodecID::Tactile || !frame->tactile) { continue; } tactile_frames.push_back(serialize_tactile_frame(*frame)); } if (tactile_frames.empty()) { return {false, "no tactile frames available for export", path}; } json root; root["codec"] = "tactile"; root["frames"] = std::move(tactile_frames); std::filesystem::path fs_path(path); if (fs_path.has_parent_path()) { std::error_code ec; std::filesystem::create_directories(fs_path.parent_path(), ec); if (ec) { return {false, "failed to create export directory: " + ec.message(), path}; } } std::ofstream stream(path, std::ios::binary | std::ios::trunc); if (!stream.is_open()) { return {false, "failed to open export file", path}; } dump_compact_json(stream, root); stream << '\n'; stream.flush(); if (!stream.good()) { return {false, "failed to write export file", path}; } return {true, {}, path}; } void JsonWritter::stop() { if (!stopped_.exchange(true)) { write_queue_.stop(); if (write_thread_.joinable()) { write_thread_.join(); } } } } // namespace ffmsep::persist