/* alternativer Client: libmosquitto-dev: https://mosquitto.org/api/files/mosquitto-h.html */ #include "main.h" #include #include #include #include const char* mqtt_topic_status_cyclecounter = "Pool/Status/CycleCounter"; const char* mqtt_topic_motor1_gear = "Pool/Motor1/Gear"; int iMqttMotor1Gear = 0; const char* mqtt_topic_motor1_power = "Pool/Motor1/Power"; int iMqttMotor1Power = 0; const char* mqtt_topic_motor2_gear = "Pool/Motor2/Gear"; int iMqttMotor2Gear = 0; const char* mqtt_topic_motor2_power = "Pool/Motor2/Power"; int iMqttMotor2Power = 0; const char* mqtt_topic_motor1_switchstate = "Pool/Motor1/SwitchState"; unsigned char nMqttMotor1SwitchState = 0; const char* mqtt_topic_motor2_switchstate = "Pool/Motor1/SwitchState"; unsigned char nMqttMotor2SwitchState = 0; const char* mqtt_topic_motor1_actualpowerw = "Pool/Motor1/ActualPowerW"; int iMqttMotor1ActualPowerW = 0; const char* mqtt_topic_motor2_actualpowerw = "Pool/Motor1/ActualPowerW"; int iMqttMotor2ActualPowerW = 0; const char* mqtt_broker_addr = "127.0.0.1"; const int mqtt_broker_port = 1883; struct mosquitto *mosq; /**< Libmosquito MQTT client instance. */ int iHadConnectError = 0; /// @brief callback function for incoming mqtt messages /// @param mosq /// @param userdata /// @param message void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { char* topic_value = (char *)malloc(message->payloadlen + 1); memcpy(topic_value, message->payload, message->payloadlen); topic_value[message->payloadlen] = '\0'; if (strcmp(message->topic, mqtt_topic_motor1_gear) == 0) { int val = 9999; if (sscanf(topic_value, "%d", &val)) { mylog(LOG_DEBUG, "MQTT: Received value for mqtt_topic_motor1_gear: %d", val); iMqttMotor1Gear = val; Can_SetMotorGear(0, val); } else { mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor1_gear: %s", topic_value); } } else if (strcmp(message->topic, mqtt_topic_motor1_power) == 0) { int val = 9999; if (sscanf(topic_value, "%d", &val)) { mylog(LOG_DEBUG, "MQTT: Received value for mqtt_topic_motor1_power: %d", val); iMqttMotor1Power = val; Can_SetMotorPower(0, val); } else { mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor1_power: %s", topic_value); } } else if (strcmp(message->topic, mqtt_topic_motor2_gear) == 0) { int val = 9999; if (sscanf(topic_value, "%d", &val)) { mylog(LOG_DEBUG, "MQTT: Received value for mqtt_topic_motor2_gear: %d", val); iMqttMotor2Gear = val; Can_SetMotorGear(1, val); } else { mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor2_gear: %s", topic_value); } } else if (strcmp(message->topic, mqtt_topic_motor2_power) == 0) { int val = 9999; if (sscanf(topic_value, "%d", &val)) { mylog(LOG_DEBUG, "MQTT: Received value for mqtt_topic_motor2_power: %d", val); iMqttMotor2Power = val; Can_SetMotorPower(1, val); } else { mylog(LOG_WARNING, "MQTT: Received mqtt_topic_motor2_power: %s", topic_value); } } else { mylog(LOG_WARNING, "MQTT: Received publish('%s'): %s", message->topic, topic_value); } free(topic_value); } /// @brief connect to mqtt broker /// @return int MqttClient_Connect() { int major, minor, revision; mosquitto_lib_version(&major, &minor, &revision); mylog(LOG_INFO, "MQTT: Libmosquitto version: %d.%d.%d", major, minor, revision); // libmosquitto initialization mosquitto_lib_init(); // create an instance of a mosquitto lib object mosq = mosquitto_new(NULL, true, NULL); if (mosq == NULL) { mylog(LOG_ERR, "MQTT: Failed to create mosquitto client!/n"); iHadConnectError++; return 1; } // Define a function which will be called by libmosquitto client every time when there is a new MQTT message mosquitto_message_callback_set(mosq, my_message_callback); // Connect to MQTT broker if (mosquitto_connect(mosq, mqtt_broker_addr, mqtt_broker_port, 60) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: Connecting to MQTT broker failed"); MqttClient_Close(); iHadConnectError++; return 2; } if (iHadConnectError) { /* Wenn wir Verbindungsfehler hatten, dann befinden wir uns wohl in Boot-Prozess und der Mosquitto ist gerade erst gestartet. Wir müssen hier etwas warten, sonst funktioniert das Subscriben nicht */ sleep(500); } // publish all topics we want to subscribe char message[10]; snprintf(message, sizeof(message), "0"); if (mosquitto_publish(mosq, NULL, mqtt_topic_motor1_gear, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor1_gear) failed!"); MqttClient_Close(); iHadConnectError++; return 10; } if (mosquitto_publish(mosq, NULL, mqtt_topic_motor1_power, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor1_power) failed!"); MqttClient_Close(); iHadConnectError++; return 11; } if (mosquitto_publish(mosq, NULL, mqtt_topic_motor2_gear, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor2_gear) failed!"); MqttClient_Close(); iHadConnectError++; return 12; MqttClient_Close(); } if (mosquitto_publish(mosq, NULL, mqtt_topic_motor2_power, strlen(message), &message, 0, false) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_publish(mqtt_topic_motor2_power) failed!"); MqttClient_Close(); iHadConnectError++; return 13; } // subscribe all needed topics if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor1_gear, 0) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor1_gear) failed!"); iHadConnectError++; return 20; } if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor1_power, 0) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor1_power) failed!"); MqttClient_Close(); iHadConnectError++; return 21; } if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor2_gear, 0) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor2_gear) failed!"); MqttClient_Close(); iHadConnectError++; return 22; } if (mosquitto_subscribe(mosq, NULL, mqtt_topic_motor2_power, 0) != MOSQ_ERR_SUCCESS) { mylog(LOG_ERR, "MQTT: mosquitto_subscribe(mqtt_topic_motor2_power) failed!"); MqttClient_Close(); iHadConnectError++; return 23; } nMqttMotor1SwitchState = 255; MqttClient_Publish_MotorSwitchState(0, 0); nMqttMotor2SwitchState = 255; MqttClient_Publish_MotorSwitchState(1, 0); iMqttMotor1ActualPowerW = -1; MqttClient_Publish_MotorActualPowerW(0, 0); iMqttMotor2ActualPowerW = -1; MqttClient_Publish_MotorActualPowerW(1, 0); mylog(LOG_INFO, "MQTT: Connected successfull after %d errors!", iHadConnectError); return 0; } /// @brief Disconnect from mqtt broker void MqttClient_Close() { //Clean up/destroy objects created by libmosquitto mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mylog(LOG_INFO, "MQTT: Disconnected!"); } /// @brief Has to be called cyclic, does the work void MqttClient_Refresher() { mosquitto_loop(mosq, 0, 1); } /// @brief publish the cycle counter as a sign of life void MqttClient_Publisher() { char message[100]; snprintf(message, sizeof(message), "%ld", pinfo.cyclecounter); mosquitto_publish(mosq, NULL, mqtt_topic_status_cyclecounter, strlen(message), message, 0, false); } /// @brief Publish the actual motor gear /// @param iMotorIndex /// @param iGear void MqttClient_Publish_MotorGear(int iMotorIndex, int iGear) { if (iMotorIndex == 0) { if (iGear != iMqttMotor1Gear) { iMqttMotor1Gear = iGear; char message[100]; snprintf(message, sizeof(message), "%d", iGear); mosquitto_publish(mosq, NULL, mqtt_topic_motor1_gear, strlen(message), message, 0, false); } } else if (iMotorIndex == 1) { if (iGear != iMqttMotor2Gear) { iMqttMotor2Gear = iGear; char message[100]; snprintf(message, sizeof(message), "%d", iGear); mosquitto_publish(mosq, NULL, mqtt_topic_motor2_gear, strlen(message), message, 0, false); } } } /// @brief Publish the requested motor power /// @param iMotorIndex /// @param iPower void MqttClient_Publish_MotorPower(int iMotorIndex, int iPower) { if (iMotorIndex == 0) { if (iPower != iMqttMotor1Power) { iMqttMotor1Power = iPower; char message[100]; snprintf(message, sizeof(message), "%d", iPower); mosquitto_publish(mosq, NULL, mqtt_topic_motor1_power, strlen(message), message, 0, false); } } else if (iMotorIndex == 1) { if (iPower != iMqttMotor2Power) { iMqttMotor2Power = iPower; char message[100]; snprintf(message, sizeof(message), "%d", iPower); mosquitto_publish(mosq, NULL, mqtt_topic_motor2_power, strlen(message), message, 0, false); } } } /// @brief Publish the actual switches states /// @param iMotorIndex /// @param nSwitchState void MqttClient_Publish_MotorSwitchState(int iMotorIndex, unsigned char nSwitchState) { if (iMotorIndex == 0) { if (nSwitchState != nMqttMotor1SwitchState) { nMqttMotor1SwitchState = nSwitchState; char message[100]; snprintf(message, sizeof(message), "%2X", nSwitchState); mosquitto_publish(mosq, NULL, mqtt_topic_motor1_switchstate, strlen(message), message, 0, false); } } else if (iMotorIndex == 1) { if (nSwitchState != nMqttMotor2SwitchState) { nMqttMotor2SwitchState = nSwitchState; char message[100]; snprintf(message, sizeof(message), "%2X", nSwitchState); mosquitto_publish(mosq, NULL, mqtt_topic_motor2_switchstate, strlen(message), message, 0, false); } } } /// @brief Publish the actual real motor power /// @param iMotorIndex /// @param iMotorPowerW void MqttClient_Publish_MotorActualPowerW(int iMotorIndex, int iMotorPowerW) { if (iMotorIndex == 0) { if (iMotorPowerW != iMqttMotor1ActualPowerW) { iMqttMotor1ActualPowerW = iMotorPowerW; char message[100]; snprintf(message, sizeof(message), "%d", iMotorPowerW); mosquitto_publish(mosq, NULL, mqtt_topic_motor1_actualpowerw, strlen(message), message, 0, false); } } else if (iMotorIndex == 1) { if (iMotorPowerW != iMqttMotor2ActualPowerW) { iMqttMotor2ActualPowerW = iMotorPowerW; char message[100]; snprintf(message, sizeof(message), "%d", iMotorPowerW); mosquitto_publish(mosq, NULL, mqtt_topic_motor2_actualpowerw, strlen(message), message, 0, false); } } }