Files
2025-12-22 00:51:03 +08:00

193 lines
5.6 KiB
C++

#ifndef MQTT_APP_H
#define MQTT_APP_H
#include <iostream>
#include <memory>
#include <string_view>
#include <string>
#include "mqtt.hh"
#include <fstream>
#include <string>
#include <memory.h>
#include <format>
#include "localstatusclient.h"
namespace {
std::string read_file_trim(const std::string& path) {
std::ifstream ifs(path);
std::string s, line;
if (!ifs) return {};
while (std::getline(ifs, line)) {
s += line;
}
// 简单 trim
while (!s.empty() && (s.back() == '\n' || s.back() == '\r' || s.back() == ' '))
s.pop_back();
return s;
}
std::string get_machine_id() {
std::string id = read_file_trim("/etc/machine-id");
if (id.empty())
id = read_file_trim("/var/lib/dbus/machine-id");
return id;
}
} // namespace
namespace af::mqtt::v1 {
enum class Env {
Dev,
Test,
Prod
};
inline std::string_view env_to_sv(Env e) noexcept {
switch (e) {
case Env::Dev:
return "dev";
case Env::Test:
return "test";
case Env::Prod:
return "prod";
}
return "test";
}
struct TopicContext {
Env env{ Env::Test };
std::string deviceId = get_machine_id();
TopicContext() = default;
TopicContext(Env e): env(e) {}
};
class Topics {
public:
static std::string base(const TopicContext& c) {
return std::format("af/v1/{}/{}", env_to_sv(c.env), c.deviceId);
}
static std::string state(const TopicContext& c) {
return std::format("{}/state", base(c));
}
static std::string telemetry(const TopicContext& c) { // retain=false
return std::format("{}/telemetry", base(c));
}
// event/<type> e.g. boot, feed_done, jam, low_food, lid_open, fault, ota
static std::string event(const TopicContext& c, std::string_view type) {
return std::format("{}/event/{}", base(c), type);
}
// cmd/ack/<cmd> e.g. feed, stop_feed, feed_plan_set
static std::string cmd_ack(const TopicContext& c, std::string_view cmd) {
return std::format("{}/cmd/ack/{}", base(c), cmd);
}
// cmd/res/<cmd>
static std::string cmd_res(const TopicContext& c, std::string_view cmd) {
return std::format("{}/cmd/res/{}", base(c), cmd);
}
// config/ack
static std::string config_ack(const TopicContext& c) {
return std::format("{}/config/ack", base(c));
}
// stream/state
static std::string stream_state(const TopicContext& c) {
return std::format("{}/stream/state", base(c));
}
// stream/event/<type>
static std::string stream_event(const TopicContext& c, std::string_view type) {
return std::format("{}/stream/event/{}", base(c), type);
}
// log/<level> debug|info|warn|error
static std::string log(const TopicContext& c, std::string_view level) {
return std::format("{}/log/{}", base(c), level);
}
// ---------- Server -> Device (subscribe) ----------
// Subscribe to all commands:
static std::string sub_cmd_all(const TopicContext& c) { // .../cmd/+
return std::format("{}/cmd/+", base(c));
}
// Or build specific cmd topic for debugging/testing:
static std::string cmd(const TopicContext& c, std::string_view cmd) { // .../cmd/<cmd>
return std::format("{}/cmd/{}", base(c), cmd);
}
// config/set
static std::string sub_config_set(const TopicContext& c) {
return std::format("{}/config/set", base(c));
}
// ota/notify
static std::string sub_ota_notify(const TopicContext& c) {
return std::format("{}/ota/notify", base(c));
}
// stream/cmd/+
static std::string sub_stream_cmd_all(const TopicContext& c) {
return std::format("{}/stream/cmd/+", base(c));
}
// For testing: stream/cmd/<cmd> (start/stop)
static std::string stream_cmd(const TopicContext& c, std::string_view cmd) {
return std::format("{}/stream/cmd/{}", base(c), cmd);
}
// ---------- Helpers (recommended topic constants) ----------
// Common cmd names
static constexpr std::string_view CMD_FEED = "feed";
static constexpr std::string_view CMD_STOP_FEED = "stop_feed";
static constexpr std::string_view CMD_PLAN_SET = "feed_plan_set";
static constexpr std::string_view CMD_PLAN_GET = "feed_plan_get";
static constexpr std::string_view CMD_PING = "ping";
// Common event types
static constexpr std::string_view EVT_BOOT = "boot";
static constexpr std::string_view EVT_FEED_DONE = "feed_done";
static constexpr std::string_view EVT_JAM = "jam";
static constexpr std::string_view EVT_LOW_FOOD = "low_food";
static constexpr std::string_view EVT_LID_OPEN = "lid_open";
static constexpr std::string_view EVT_FAULT = "fault";
static constexpr std::string_view EVT_OTA = "ota";
// Common stream event types
static constexpr std::string_view STRM_EVT_ERROR = "error";
static constexpr std::string_view STRM_EVT_CLIENT_JOIN = "client_join";
static constexpr std::string_view STRM_EVT_CLIENT_LEAVE = "client_leave";
static constexpr std::string_view STRM_EVT_BITRATE_DROP = "bitrate_drop";
};
enum class State {
Online = 0x01,
Free = 0x02,
Feed = 0x04,
Push = 0x08,
};
class MqttApp {
public:
private:
void app_publish_state(State state);
void encode_state_json(uint8_t mask, std::string& str_json);
void init_topic_feed_callback();
private:
std::unique_ptr<MqttClient> mqtt_client_;
std::unique_ptr<LocalStatusClient> rpc_client_;
// std::unique_ptr<Timer> timer_ = std::make_unique<Timer>();
TopicContext topic_contex_{};
};
} // namespace af::mqtt::v1
#endif