Files
CanRtDriver/mqtt/mqtt_client.c
2025-12-08 19:36:05 +01:00

339 lines
10 KiB
C
Executable File

/*
alternativer Client: libmosquitto-dev:
https://mosquitto.org/api/files/mosquitto-h.html
*/
#include <main.h>
#include <mqtt/mqtt_client.h>
#include <can/can_client.h>
#include <string.h>
#include <mosquitto.h>
const char* mqtt_topic_status_cyclecounter = "Pool/Status/CycleCounter";
const char* mqtt_topic_motor1_gear = "Pool/Motor1/Gear";
int iMqttMotor1Gear = 0;
const char* mqtt_topic_motor1_power = "Pool/Motor1/Power";
int iMqttMotor1Power = 0;
const char* mqtt_topic_motor2_gear = "Pool/Motor2/Gear";
int iMqttMotor2Gear = 0;
const char* mqtt_topic_motor2_power = "Pool/Motor2/Power";
int iMqttMotor2Power = 0;
const char* mqtt_topic_motor1_switchstate = "Pool/Motor1/SwitchState";
unsigned char nMqttMotor1SwitchState = 0;
const char* mqtt_topic_motor2_switchstate = "Pool/Motor1/SwitchState";
unsigned char nMqttMotor2SwitchState = 0;
const char* mqtt_topic_motor1_actualpowerw = "Pool/Motor1/ActualPowerW";
int iMqttMotor1ActualPowerW = 0;
const char* mqtt_topic_motor2_actualpowerw = "Pool/Motor1/ActualPowerW";
int iMqttMotor2ActualPowerW = 0;
const char* mqtt_broker_addr = "127.0.0.1";
const int mqtt_broker_port = 1883;
struct mosquitto *mosq; /**< Libmosquito MQTT client instance. */
int iHadConnectError = 0;
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
char* topic_value = (char *)malloc(message->payloadlen + 1);
memcpy(topic_value, message->payload, message->payloadlen);
topic_value[message->payloadlen] = '\0';
if (strcmp(message->topic, mqtt_topic_motor1_gear) == 0)
{
int val = 9999;
if (sscanf(topic_value, "%d", &val))
{
printf("%ld: Received value for mqtt_topic_motor1_gear: %d\n", pinfo.cyclecounter, val);
iMqttMotor1Gear = val;
Can_SetMotorGear(0, val);
}
else
{
printf("%ld: Received mqtt_topic_motor1_gear: %s\n", pinfo.cyclecounter, topic_value);
}
}
else if (strcmp(message->topic, mqtt_topic_motor1_power) == 0)
{
int val = 9999;
if (sscanf(topic_value, "%d", &val))
{
printf("%ld: Received value for mqtt_topic_motor1_power: %d\n", pinfo.cyclecounter, val);
iMqttMotor1Power = val;
Can_SetMotorPower(0, val);
}
else
{
printf("%ld: Received mqtt_topic_motor1_power: %s\n", pinfo.cyclecounter, topic_value);
}
}
else if (strcmp(message->topic, mqtt_topic_motor2_gear) == 0)
{
int val = 9999;
if (sscanf(topic_value, "%d", &val))
{
printf("%ld: Received value for mqtt_topic_motor2_gear: %d\n", pinfo.cyclecounter, val);
iMqttMotor2Gear = val;
Can_SetMotorGear(1, val);
}
else
{
printf("%ld: Received mqtt_topic_motor2_gear: %s\n", pinfo.cyclecounter, topic_value);
}
}
else if (strcmp(message->topic, mqtt_topic_motor2_power) == 0)
{
int val = 9999;
if (sscanf(topic_value, "%d", &val))
{
printf("%ld: Received value for mqtt_topic_motor2_power: %d\n", pinfo.cyclecounter, val);
iMqttMotor2Power = val;
Can_SetMotorPower(1, val);
}
else
{
printf("%ld: Received mqtt_topic_motor2_power: %s\n", pinfo.cyclecounter, topic_value);
}
}
else
{
printf("%ld: Received publish('%s'): %s\n", pinfo.cyclecounter, message->topic, topic_value);
}
free(topic_value);
}
int MqttClient_Connect()
{
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
printf("Libmosquitto version: %d.%d.%d\n", major, minor, revision);
// libmosquitto initialization
mosquitto_lib_init();
// create an instance of a mosquitto lib object
mosq = mosquitto_new(NULL, true, NULL);
if (mosq == NULL)
{
printf("Failed to create mosquitto client!/n");
iHadConnectError++;
return 1;
}
// Define a function which will be called by libmosquitto client every time when there is a new MQTT message
mosquitto_message_callback_set(mosq, my_message_callback);
// Connect to MQTT broker
if (mosquitto_connect(mosq, mqtt_broker_addr, mqtt_broker_port, 60) != MOSQ_ERR_SUCCESS)
{
printf("Error: connecting to MQTT broker failed\n");
MqttClient_Close();
iHadConnectError++;
return 2;
}
if (iHadConnectError)
{
/* Wenn wir Verbindungsfehler hatten, dann befinden wir uns wohl in Boot-Prozess und der Mosquitto ist
gerade erst gestartet. Wir müssen hier etwas warten, sonst funktioniert das Subscriben nicht */
sleep(10);
}
// publish all topics we want to subscribe
char message[10];
snprintf(message, sizeof(message), "0");
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor1_gear, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish(mqtt_topic_motor1_gear) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 10;
}
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor1_power, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish(mqtt_topic_motor1_power) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 11;
}
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor2_gear, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish(mqtt_topic_motor2_gear) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 12;
}
if (mosquitto_publish(mosq, NULL, mqtt_topic_motor2_power, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish(mqtt_topic_motor2_power) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 13;
}
// subscribe all needed topics
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor1_gear, 0) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_subscribe(mqtt_topic_motor1_gear) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 20;
}
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor1_power, 0) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_subscribe(mqtt_topic_motor1_power) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 21;
}
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor2_gear, 0) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_subscribe(mqtt_topic_motor2_gear) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 22;
}
if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor2_power, 0) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_subscribe(mqtt_topic_motor2_power) fehlgeschlagen!\n");
MqttClient_Close();
iHadConnectError++;
return 23;
}
printf("MQTT: Connected successfull!\n");
return 0;
}
void MqttClient_Close()
{
//Clean up/destroy objects created by libmosquitto
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("MQTT: Disconnected!\n");
}
void MqttClient_Refresher()
{
mosquitto_loop(mosq, 0, 1);
}
void MqttClient_Publisher()
{
char message[100];
snprintf(message, sizeof(message), "%ld", pinfo.cyclecounter);
mosquitto_publish(mosq, NULL, mqtt_topic_status_cyclecounter, strlen(message), message, 0, false);
}
void MqttClient_Publish_MotorGear(int iMotorIndex, int iGear)
{
if (iMotorIndex == 0)
{
if (iGear != iMqttMotor1Gear)
{
iMqttMotor1Gear = iGear;
char message[100];
snprintf(message, sizeof(message), "%d", iGear);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_gear, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (iGear != iMqttMotor2Gear)
{
iMqttMotor2Gear = iGear;
char message[100];
snprintf(message, sizeof(message), "%d", iGear);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_gear, strlen(message), message, 0, false);
}
}
}
void MqttClient_Publish_MotorPower(int iMotorIndex, int iPower)
{
if (iMotorIndex == 0)
{
if (iPower != iMqttMotor1Power)
{
iMqttMotor1Power = iPower;
char message[100];
snprintf(message, sizeof(message), "%d", iPower);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_power, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (iPower != iMqttMotor2Power)
{
iMqttMotor2Power = iPower;
char message[100];
snprintf(message, sizeof(message), "%d", iPower);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_power, strlen(message), message, 0, false);
}
}
}
void MqttClient_Publish_MotorSwitchState(int iMotorIndex, unsigned char nSwitchState)
{
if (iMotorIndex == 0)
{
if (nSwitchState != nMqttMotor1SwitchState)
{
nMqttMotor1SwitchState = nSwitchState;
char message[100];
snprintf(message, sizeof(message), "%d", nSwitchState);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_switchstate, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (nSwitchState != nMqttMotor2SwitchState)
{
nMqttMotor2SwitchState = nSwitchState;
char message[100];
snprintf(message, sizeof(message), "%d", nSwitchState);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_switchstate, strlen(message), message, 0, false);
}
}
}
void MqttClient_Publish_MotorActualPowerW(int iMotorIndex, int iMotorPowerW)
{
if (iMotorIndex == 0)
{
if (iMotorPowerW != iMqttMotor1ActualPowerW)
{
iMqttMotor1ActualPowerW = iMotorPowerW;
char message[100];
snprintf(message, sizeof(message), "%d", iMotorPowerW);
mosquitto_publish(mosq, NULL, mqtt_topic_motor1_actualpowerw, strlen(message), message, 0, false);
}
}
else if (iMotorIndex == 1)
{
if (iMotorPowerW != iMqttMotor2ActualPowerW)
{
iMqttMotor2ActualPowerW = iMotorPowerW;
char message[100];
snprintf(message, sizeof(message), "%d", iMotorPowerW);
mosquitto_publish(mosq, NULL, mqtt_topic_motor2_actualpowerw, strlen(message), message, 0, false);
}
}
}