daily update

This commit is contained in:
2025-12-22 00:51:03 +08:00
parent 1c3456c1a9
commit ea995a636f
160 changed files with 19154 additions and 0 deletions

View File

@@ -0,0 +1,207 @@
#include "mqtt.hh"
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <mosquitto.h>
#include <mutex>
#include <stdexcept>
#include <sys/stat.h>
#include <thread>
#include <utility>
#include "easylogging++.h"
MosquittoLibInit::MosquittoLibInit() { mosquitto_lib_init(); }
MosquittoLibInit::~MosquittoLibInit() { mosquitto_lib_cleanup(); }
MosquittoLibInit& MosquittoLibInit::get_instance() {
static MosquittoLibInit inst;
return inst;
}
static MosquittoLibInit g_mosq_guard;
MqttClient::MqttClient(const std::string& host, const uint16_t& port, int keepalive, std::string clientid, bool clean_session): host_(std::move(host)),
port_(port),
clientid_(std::move(clientid)),
clean_session_(clean_session),
keepalive_(keepalive) {
mosq_ = mosquitto_new(clientid_.empty() ? nullptr : clientid_.c_str(), clean_session_, this);
if (!mosq_) {
throw std::runtime_error("mosquitto_new failed");
}
int rc = mosquitto_threaded_set(mosq_, true);
if (rc != MOSQ_ERR_SUCCESS) {
throw std::runtime_error(std::string("mosquitto_threaded_set failed: ")
+ mosquitto_strerror(rc));
}
mosquitto_connect_callback_set(mosq_, handle_connect);
mosquitto_disconnect_callback_set(mosq_, handle_disconnect);
mosquitto_message_callback_set(mosq_, handle_message);
}
MqttClient::~MqttClient() {
stop_ = true;
if (loop_thread_.joinable()) {
loop_thread_.join();
}
if (mosq_) {
mosquitto_destroy(mosq_);
mosq_ = nullptr;
}
}
bool MqttClient::connect(int timeout_ms) {
std::lock_guard<std::mutex> lock(mutex_);
if (pwdneed_ == true) {
int rc = mosquitto_username_pw_set(mosq_, username_.c_str(), userpwd_.c_str());
if (rc != MOSQ_ERR_SUCCESS) {
log_mqttclient_error("mosquitto_username_pw_set failed: ", rc);
return false;
}
}
int rc = mosquitto_connect(mosq_, host_.c_str(), port_, keepalive_);
if (rc != MOSQ_ERR_SUCCESS) {
log_mqttclient_error("mosquitto_connect failed: ", rc);
return false;
}
stop_ = false;
loop_thread_ = std::thread(&MqttClient::loop_thread_function, this);
auto start = std::chrono::steady_clock::now();
while (!connected_) {
auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count() > timeout_ms) {
LOG(INFO) << "MQTT connect timeout\n";
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return true;
}
void MqttClient::disconnect() {
std::lock_guard<std::mutex> lock(mutex_);
if (!mosq_) {
return;
}
mosquitto_destroy(mosq_);
stop_ = true;
}
void MqttClient::set_auth(const char* username, const char* userpwd) {
username_ = username;
userpwd_ = userpwd;
pwdneed_ = true;
}
bool MqttClient::publish(const std::string& topic,
const void* payload,
size_t len,
int qos,
bool retain) {
std::lock_guard<std::mutex> lock(mutex_);
if (!mosq_) {
return false;
}
int mid = 0;
int rc = mosquitto_publish(mosq_, &mid, topic.c_str(), static_cast<int>(len), payload, qos, retain);
if (rc != MOSQ_ERR_SUCCESS) {
log_mqttclient_error("mosquitto_publish failed: ", rc);
return false;
}
return true;
}
bool MqttClient::publish(const std::string& topic, const std::string& paylod, int qos, bool retain) {
return publish(topic, paylod.data(), paylod.size(), qos, retain);
}
bool MqttClient::subscribe(const std::string& topic, int qos, MessageHandler cb) {
std::lock_guard<std::mutex> lock(mutex_);
if (!mosq_) {
return false;
}
int rc = mosquitto_subscribe(mosq_, nullptr, topic.c_str(), qos);
if (rc != MOSQ_ERR_SUCCESS) {
log_mqttclient_error("mosquitto_subscribe failed: ", rc);
return false;
}
handers_[topic] = std::move(cb);
return true;
}
void MqttClient::handle_connect(mosquitto* mosq, void* obj, int rc) {
auto* self = static_cast<MqttClient*>(obj);
if (rc == 0) {
self->connected_ = true;
LOG(INFO) << "MQTT connected: " << self->host_ << ":" << self->port_;
}
else {
LOG(ERROR) << "MQTT connect failed: " << self->host_ << ":" << self->port_;
}
}
void MqttClient::handle_disconnect(mosquitto* mosq, void* obj, int rc) {
auto* self = static_cast<MqttClient*>(obj);
self->connected_ = false;
LOG(INFO) << "MQTT disconnected: " << self->host_ << ":" << self->port_;
}
void MqttClient::handle_message(mosquitto* mosq, void* obj, const mosquitto_message* msg) {
auto* self = static_cast<MqttClient*>(obj);
if (!msg || !msg->payload) {
return;
}
std::string topic = msg->topic;
std::vector<uint8_t> payload;
if (msg->payload && msg->payloadlen > 0) {
auto* p = static_cast<uint8_t*>(msg->payload);
payload.assign(p, p + msg->payloadlen);
}
std::lock_guard<std::mutex> lock(self->mutex_);
auto ite = self->handers_.find(topic);
if (ite != self->handers_.end()) {
ite->second(topic, payload, msg->qos, msg->retain);
}
}
void MqttClient::loop_thread_function() {
while (!stop_) {
int rc = mosquitto_loop(mosq_, 100, 1);
if (stop_) {
break;
}
if (rc != MOSQ_ERR_SUCCESS) {
connected_ = false;
log_mqttclient_error("mosquitto_loop error: ", rc);
std::this_thread::sleep_for(std::chrono::seconds(1));
mosquitto_reconnect(mosq_);
}
}
}
void MqttClient::log_mqttclient_error(const std::string& error_str, int rc) {
LOG(ERROR) << error_str << mosquitto_strerror(rc);
}

View File

@@ -0,0 +1,75 @@
#pragma once
#include <atomic>
#include <cstdint>
#include <functional>
#include <mosquitto.h>
#include <string>
#include <unordered_map>
#include <vector>
#include <iostream>
#include <thread>
#include <mutex>
class MosquittoLibInit {
public:
MosquittoLibInit();
~MosquittoLibInit();
static MosquittoLibInit& get_instance();
};
class MqttClient {
public:
using MessageHandler = std::function<void(const std::string& topic,
const std::vector<uint8_t>& payload,
int qos,
bool retain)>;
MqttClient(const std::string& host, const uint16_t& port, int keepalive = 60, std::string clientid = {}, bool clean_session = true);
~MqttClient();
MqttClient(const MqttClient&) = delete;
MqttClient& operator=(const MqttClient&) = delete;
bool connect(int timeout_ms = 3000);
void disconnect();
bool publish(const std::string& topic, const void* payload, size_t len, int qos = 0, bool retain = false);
bool publish(const std::string& topic, const std::string& paylod, int qos = 0, bool retain = false);
bool subscribe(const std::string& topic, int qos, MessageHandler cb);
bool isconnect();
void set_auth(const char* username, const char* userpwd);
private:
static void handle_connect(mosquitto* mosq, void* obj, int rc);
static void handle_disconnect(mosquitto* mosq, void* obj, int rc);
static void handle_message(mosquitto* mosq, void* obj, const mosquitto_message* msg);
void loop_thread_function();
static void log_mqttclient_error(const std::string& error_str, int rc);
private:
std::string host_;
uint16_t port_;
std::string clientid_;
bool clean_session_;
int keepalive_;
mosquitto* mosq_ = nullptr;
std::atomic<bool> connected_{ false };
std::atomic<bool> stop_{ true };
std::thread loop_thread_;
std::mutex mutex_;
std::string username_;
std::string userpwd_;
std::atomic<bool> pwdneed_{ false };
std::unordered_map<std::string, MessageHandler> handers_;
};

View File

@@ -0,0 +1,27 @@
#include "mqtt/mqtt_app.hh"
#include "jsoncpp/json/json.h"
#include "jsoncpp/json/value.h"
#include "jsoncpp/json/writer.h"
#include <cstdint>
using namespace af::mqtt::v1;
void MqttApp::app_publish_state(State state) {
std::string payload;
uint8_t mask = static_cast<uint8_t>(state);
encode_state_json(mask, payload);
mqtt_client_->publish(Topics::state(topic_contex_), payload, 1, false);
}
void MqttApp::encode_state_json(uint8_t mask, std::string& str_json) {
Json::Value root;
root["version"] = "v1";
root["state"] = mask;
root["left"] = 0;
Json::FastWriter write;
str_json = write.write(root);
}
void MqttApp::init_topic_feed_callback() {
mqtt_client_->subscribe(Topics::cmd_ack(topic_contex_, "feed"), 0, nullptr);
}

View File

@@ -0,0 +1,193 @@
#ifndef MQTT_APP_H
#define MQTT_APP_H
#include <iostream>
#include <memory>
#include <string_view>
#include <string>
#include "mqtt.hh"
#include <fstream>
#include <string>
#include <memory.h>
#include <format>
#include "localstatusclient.h"
namespace {
std::string read_file_trim(const std::string& path) {
std::ifstream ifs(path);
std::string s, line;
if (!ifs) return {};
while (std::getline(ifs, line)) {
s += line;
}
// 简单 trim
while (!s.empty() && (s.back() == '\n' || s.back() == '\r' || s.back() == ' '))
s.pop_back();
return s;
}
std::string get_machine_id() {
std::string id = read_file_trim("/etc/machine-id");
if (id.empty())
id = read_file_trim("/var/lib/dbus/machine-id");
return id;
}
} // namespace
namespace af::mqtt::v1 {
enum class Env {
Dev,
Test,
Prod
};
inline std::string_view env_to_sv(Env e) noexcept {
switch (e) {
case Env::Dev:
return "dev";
case Env::Test:
return "test";
case Env::Prod:
return "prod";
}
return "test";
}
struct TopicContext {
Env env{ Env::Test };
std::string deviceId = get_machine_id();
TopicContext() = default;
TopicContext(Env e): env(e) {}
};
class Topics {
public:
static std::string base(const TopicContext& c) {
return std::format("af/v1/{}/{}", env_to_sv(c.env), c.deviceId);
}
static std::string state(const TopicContext& c) {
return std::format("{}/state", base(c));
}
static std::string telemetry(const TopicContext& c) { // retain=false
return std::format("{}/telemetry", base(c));
}
// event/<type> e.g. boot, feed_done, jam, low_food, lid_open, fault, ota
static std::string event(const TopicContext& c, std::string_view type) {
return std::format("{}/event/{}", base(c), type);
}
// cmd/ack/<cmd> e.g. feed, stop_feed, feed_plan_set
static std::string cmd_ack(const TopicContext& c, std::string_view cmd) {
return std::format("{}/cmd/ack/{}", base(c), cmd);
}
// cmd/res/<cmd>
static std::string cmd_res(const TopicContext& c, std::string_view cmd) {
return std::format("{}/cmd/res/{}", base(c), cmd);
}
// config/ack
static std::string config_ack(const TopicContext& c) {
return std::format("{}/config/ack", base(c));
}
// stream/state
static std::string stream_state(const TopicContext& c) {
return std::format("{}/stream/state", base(c));
}
// stream/event/<type>
static std::string stream_event(const TopicContext& c, std::string_view type) {
return std::format("{}/stream/event/{}", base(c), type);
}
// log/<level> debug|info|warn|error
static std::string log(const TopicContext& c, std::string_view level) {
return std::format("{}/log/{}", base(c), level);
}
// ---------- Server -> Device (subscribe) ----------
// Subscribe to all commands:
static std::string sub_cmd_all(const TopicContext& c) { // .../cmd/+
return std::format("{}/cmd/+", base(c));
}
// Or build specific cmd topic for debugging/testing:
static std::string cmd(const TopicContext& c, std::string_view cmd) { // .../cmd/<cmd>
return std::format("{}/cmd/{}", base(c), cmd);
}
// config/set
static std::string sub_config_set(const TopicContext& c) {
return std::format("{}/config/set", base(c));
}
// ota/notify
static std::string sub_ota_notify(const TopicContext& c) {
return std::format("{}/ota/notify", base(c));
}
// stream/cmd/+
static std::string sub_stream_cmd_all(const TopicContext& c) {
return std::format("{}/stream/cmd/+", base(c));
}
// For testing: stream/cmd/<cmd> (start/stop)
static std::string stream_cmd(const TopicContext& c, std::string_view cmd) {
return std::format("{}/stream/cmd/{}", base(c), cmd);
}
// ---------- Helpers (recommended topic constants) ----------
// Common cmd names
static constexpr std::string_view CMD_FEED = "feed";
static constexpr std::string_view CMD_STOP_FEED = "stop_feed";
static constexpr std::string_view CMD_PLAN_SET = "feed_plan_set";
static constexpr std::string_view CMD_PLAN_GET = "feed_plan_get";
static constexpr std::string_view CMD_PING = "ping";
// Common event types
static constexpr std::string_view EVT_BOOT = "boot";
static constexpr std::string_view EVT_FEED_DONE = "feed_done";
static constexpr std::string_view EVT_JAM = "jam";
static constexpr std::string_view EVT_LOW_FOOD = "low_food";
static constexpr std::string_view EVT_LID_OPEN = "lid_open";
static constexpr std::string_view EVT_FAULT = "fault";
static constexpr std::string_view EVT_OTA = "ota";
// Common stream event types
static constexpr std::string_view STRM_EVT_ERROR = "error";
static constexpr std::string_view STRM_EVT_CLIENT_JOIN = "client_join";
static constexpr std::string_view STRM_EVT_CLIENT_LEAVE = "client_leave";
static constexpr std::string_view STRM_EVT_BITRATE_DROP = "bitrate_drop";
};
enum class State {
Online = 0x01,
Free = 0x02,
Feed = 0x04,
Push = 0x08,
};
class MqttApp {
public:
private:
void app_publish_state(State state);
void encode_state_json(uint8_t mask, std::string& str_json);
void init_topic_feed_callback();
private:
std::unique_ptr<MqttClient> mqtt_client_;
std::unique_ptr<LocalStatusClient> rpc_client_;
// std::unique_ptr<Timer> timer_ = std::make_unique<Timer>();
TopicContext topic_contex_{};
};
} // namespace af::mqtt::v1
#endif