Files
CanRtDriver/mqtt/mqtt_client.c
2026-01-20 21:15:35 +01:00

304 lines
9.4 KiB
C
Executable File

/*
alternativer Client: libmosquitto-dev:
https://mosquitto.org/api/files/mosquitto-h.html
*/
#include "main.h"
#include <mqtt/mqtt_client.h>
#include <can/can_client.h>
#include <settings/settings.h>
#include <string.h>
#include <mosquitto.h>
// Topics to subscribe
const char* mqtt_topic_motor_gear_request = "Pool/Motor_Gear_Request";
const char* mqtt_topic_motor_power_request = "Pool/Motor_Power_Request";
// Topics to publish
const char* mqtt_topic_status_cyclecounter = "Pool/Status/CycleCounter";
const char* mqtt_topic_motor1_gear = "Pool/Motor1/Gear";
const char* mqtt_topic_motor1_power = "Pool/Motor1/Power";
const char* mqtt_topic_motor2_gear = "Pool/Motor2/Gear";
const char* mqtt_topic_motor2_power = "Pool/Motor2/Power";
const char* mqtt_topic_motor1_switchstate = "Pool/Motor1/SwitchState";
unsigned char nMotor1SwitchState = 255;
const char* mqtt_topic_motor2_switchstate = "Pool/Motor1/SwitchState";
unsigned char nMotor2SwitchState = 255;
const char* mqtt_topic_motor1_actualpowerw = "Pool/Motor1/ActualPowerW";
int iMqttMotor1ActualPowerW = -1;
const char* mqtt_topic_motor2_actualpowerw = "Pool/Motor1/ActualPowerW";
int iMqttMotor2ActualPowerW = -1;
const char* mqtt_broker_addr = "127.0.0.1";
const int mqtt_broker_port = 1883;
struct mosquitto *mosq; /**< Libmosquito MQTT client instance. */
int iHadConnectError = 0;
/// @brief callback function for incoming mqtt messages
/// @param mosq
/// @param userdata
/// @param message
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
char* topic_value = (char *)malloc(message->payloadlen + 1);
memcpy(topic_value, message->payload, message->payloadlen);
topic_value[message->payloadlen] = '\0';
if (strcmp(message->topic, mqtt_topic_motor_gear_request) == 0)
{
int val = 123456789;
if (sscanf(topic_value, "%d", &val))
{
mylog(LOG_INFO, "MQTT: Received value for mqtt_topic_motor_gear_request: %d", val);
if (val == 123456789)
{
val = 0;
}
for (int i=0; i<settings.iMotorCount; i++)
{
Can_SetMotorGear(i, val);
}
}
else
{
mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor_gear_request: %s", topic_value);
}
}
else if (strcmp(message->topic, mqtt_topic_motor_power_request) == 0)
{
int val = 123456789;
if (sscanf(topic_value, "%d", &val))
{
mylog(LOG_INFO, "MQTT: Received value for mqtt_topic_motor_power_request: %d", val);
if (val == 123456789)
{
val = 0;
}
for (int i=0; i<settings.iMotorCount; i++)
{
Can_SetMotorPower(i, val);
}
}
else
{
mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor_gear_request: %s", topic_value);
}
}
else
{
mylog(LOG_WARNING, "MQTT: Received publish('%s'): %s", message->topic, topic_value);
}
free(topic_value);
}
/// @brief callback function after connection
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
if (rc == 0)
{
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor_gear_request, 0) != MOSQ_ERR_SUCCESS)
{
mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor_gear_request) failed!");
}
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor_power_request, 0) != MOSQ_ERR_SUCCESS)
{
mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor_power_request) failed!");
}
char message[10];
snprintf(message, sizeof(message), "0");
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor_gear_request, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor_gear_request) failed!");
}
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor_power_request, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor_power_request) failed!");
}
for (int i=0; i<settings.iMotorCount; i++)
{
MqttClient_Publish_MotorSwitchState(i, 0);
MqttClient_Publish_MotorActualPowerW(i, 0);
}
}
}
/// @brief connect to mqtt broker
/// @return
int MqttClient_Connect()
{
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
mylog(LOG_INFO, "MQTT: Libmosquitto version: %d.%d.%d", major, minor, revision);
// libmosquitto initialization
mosquitto_lib_init();
// create an instance of a mosquitto lib object
mosq = mosquitto_new(NULL, true, NULL);
if (mosq == NULL)
{
mylog(LOG_ERR, "MQTT: Failed to create mosquitto client!/n");
iHadConnectError++;
return 1;
}
// Define a function which will be called by libmosquitto client every time when there is a new MQTT message
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
// Connect to MQTT broker
if (mosquitto_connect(mosq, mqtt_broker_addr, mqtt_broker_port, 60) != MOSQ_ERR_SUCCESS)
{
mylog(LOG_ERR, "MQTT: Connecting to MQTT broker failed");
MqttClient_Close();
iHadConnectError++;
return 2;
}
if (iHadConnectError)
{
/* Wenn wir Verbindungsfehler hatten, dann befinden wir uns wohl in Boot-Prozess und der Mosquitto ist
gerade erst gestartet. Wir müssen hier etwas warten, sonst funktioniert das Subscriben nicht */
sleep(500);
}
mylog(LOG_INFO, "MQTT: Connected successfull after %d errors!", iHadConnectError);
return 0;
}
/// @brief Disconnect from mqtt broker
void MqttClient_Close()
{
//Clean up/destroy objects created by libmosquitto
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
mylog(LOG_INFO, "MQTT: Disconnected!");
}
/// @brief Has to be called cyclic, does the work
void MqttClient_Refresher()
{
mosquitto_loop(mosq, 0, 1);
}
/// @brief publish the cycle counter as a sign of life
void MqttClient_Publisher()
{
char message[100];
snprintf(message, sizeof(message), "%ld", pinfo.cyclecounter);
mosquitto_publish(mosq, NULL, mqtt_topic_status_cyclecounter, strlen(message), message, 0, false);
}
/// @brief Publish the actual motor gear
/// @param iMotorIndex
/// @param iGear
void MqttClient_Publish_MotorGear(int iMotorIndex, int iGear)
{
if (iMotorIndex == 0)
{
char message[100];
snprintf(message, sizeof(message), "%d", iGear);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_gear, strlen(message), message, 0, false);
}
else if (iMotorIndex == 1)
{
char message[100];
snprintf(message, sizeof(message), "%d", iGear);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_gear, strlen(message), message, 0, false);
}
}
/// @brief Publish the requested motor power
/// @param iMotorIndex
/// @param iPower
void MqttClient_Publish_MotorPower(int iMotorIndex, int iPower)
{
if (iMotorIndex == 0)
{
char message[100];
snprintf(message, sizeof(message), "%d", iPower);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_power, strlen(message), message, 0, false);
}
else if (iMotorIndex == 1)
{
char message[100];
snprintf(message, sizeof(message), "%d", iPower);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_power, strlen(message), message, 0, false);
}
}
/// @brief Publish the actual switches states
/// @param iMotorIndex
/// @param nSwitchState
void MqttClient_Publish_MotorSwitchState(int iMotorIndex, unsigned char nSwitchState)
{
if (iMotorIndex == 0)
{
if (nSwitchState != nMotor1SwitchState)
{
nMotor1SwitchState = nSwitchState;
char message[100];
snprintf(message, sizeof(message), "%2X", nSwitchState);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_switchstate, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (nSwitchState != nMotor2SwitchState)
{
nMotor2SwitchState = nSwitchState;
char message[100];
snprintf(message, sizeof(message), "%2X", nSwitchState);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_switchstate, strlen(message), message, 0, false);
}
}
}
/// @brief Publish the actual real motor power
/// @param iMotorIndex
/// @param iMotorPowerW
void MqttClient_Publish_MotorActualPowerW(int iMotorIndex, int iMotorPowerW)
{
if (iMotorIndex == 0)
{
if (iMotorPowerW != iMqttMotor1ActualPowerW)
{
iMqttMotor1ActualPowerW = iMotorPowerW;
char message[100];
snprintf(message, sizeof(message), "%d", iMotorPowerW);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_actualpowerw, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (iMotorPowerW != iMqttMotor2ActualPowerW)
{
iMqttMotor2ActualPowerW = iMotorPowerW;
char message[100];
snprintf(message, sizeof(message), "%d", iMotorPowerW);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_actualpowerw, strlen(message), message, 0, false);
}
}
}