#include #include #include #include #include #include #include // I2C library. Who thought "Wire" was a good name for it? #include #include const char* ssid = "xxx"; const char* password = "xxx"; const char* mqttServer = "xxx"; const int mqttPort = 1883; WiFiClient espClient; PubSubClient client(espClient); WiFiUDP ntpUDP; NTPClient timeClient(ntpUDP, NTP_ADDRESS, NTP_OFFSET, NTP_INTERVAL); uint8_t gpioa_state = 0; unsigned long timeouts[PUMPS_COUNT]; unsigned long last_sensor_update = 0; uint16_t sensor_max[SENSORS_COUNT]; uint16_t sensor_min[SENSORS_COUNT]; uint16_t sensor_burst = 0; uint8_t active_bits(uint8_t v) { uint8_t c; for (c = 0; v; v >>= 1) c += v & 1; return v; } void adjust_calibration(uint16_t v, uint8_t i) { if (v < sensor_min[i]) sensor_min[i] = v; if (v > sensor_max[i]) sensor_max[i] = v; } void publish_availability(uint8_t address, uint8_t available) { char t[60]; sprintf(t, "%s%c/available", TOPIC_MOISTURE_SENSORS, address); if (available) client.publish(t, "online"); else client.publish(t, "offline"); } void publish_sensor_value(uint8_t address, uint16_t v) { char vstr[10]; char t[60]; sprintf(t, "%s%c/value", TOPIC_MOISTURE_SENSORS, address); sprintf(vstr, "%d", v); client.publish(t, vstr); } uint16_t read_sensor(uint8_t address) { // Transmit address digitalWrite(RS485_OUTPUT_ENABLE, HIGH); Serial.write(address); Serial.flush(); digitalWrite(RS485_OUTPUT_ENABLE, LOW); Serial.read(); // Receiver is always enabled, so the first read byte is the address we just sent uint8_t value[2]; size_t l = Serial.readBytes(value, 2); // Blocking read of 2 bytes. Timeout defaults to 1000 ms. Returns number of bytes read if (l < 2) return RS485_ERROR_TIMEOUT; // This value (0xFFFF) can never be the ADC result, so it will be our error code // Reconstruct the read value return ((uint16_t)value[0] << 8) | value[1]; } void update_sensors() { uint16_t v; // Make sure the uart input buffer is empty while(Serial.available() > 0) Serial.read(); for (uint8_t i = 0; i < SENSORS_COUNT; i++) { uint8_t address = (uint8_t)SENSOR_ADDR_START + i; v = read_sensor(address); if (v <= RS485_MAX_INT) { adjust_calibration(v, i); v = map(v, sensor_min[i], sensor_max[i], 0, 100); publish_availability(address, ONLINE); publish_sensor_value(address, v); } else { publish_availability(address, OFFLINE); } } } void gpio_expander_setup() { // Set PORTA channels to output Wire.beginTransmission(GPIO_EXPANDER_ADDR); Wire.write(GPIO_EXPANDER_DIR_A); Wire.write(0x00); // All ouptuts Wire.endTransmission(); } void gpio_expander_set_pin(int pin) { if (pin > 7) return; gpioa_state |= (1 << pin); Wire.beginTransmission(GPIO_EXPANDER_ADDR); Wire.write(GPIO_EXPANDER_PORT_A); Wire.write(gpioa_state); uint8_t i2c_error = Wire.endTransmission(); if (i2c_error != I2C_SUCCESS) { if (i2c_error == I2C_ADDRESS_NACK) { char m[50]; sprintf(m, "ERROR: No device at address %X", GPIO_EXPANDER_ADDR); client.publish(TOPIC_LOG, m); } else if (i2c_error == I2C_DATA_NACK) { client.publish(TOPIC_LOG, "ERROR: Invalid data on I2C bus"); } } } void gpio_expander_clear_pin(int pin) { if (pin > 7) return; gpioa_state &= ~(1 << pin); Wire.beginTransmission(GPIO_EXPANDER_ADDR); Wire.write(GPIO_EXPANDER_PORT_A); Wire.write(gpioa_state); uint8_t i2c_error = Wire.endTransmission(); if (i2c_error != I2C_SUCCESS) { if (i2c_error == I2C_ADDRESS_NACK) { char m[50]; sprintf(m, "ERROR: No device at address %X", GPIO_EXPANDER_ADDR); client.publish(TOPIC_LOG, m); } else if (i2c_error == I2C_DATA_NACK) { client.publish(TOPIC_LOG, "ERROR: Invalid data on I2C bus"); } } } void recv_callback(char* topic, byte* payload, unsigned int length) { if (!strcmp(topic, TOPIC_PUMP_COMMAND)) { // strcmp returns 0 if the strings are equal // The payload is in the form {"id":;"enable":;"timeout":} const int capacity = JSON_OBJECT_SIZE(3); StaticJsonDocument p; DeserializationError err = deserializeJson(p, payload); if (err == DeserializationError::Ok) { uint8_t pump_id = p["id"].as(); if (pump_id > PUMPS_COUNT - 1) { client.publish(TOPIC_LOG, "ERROR: Invalid pump id"); client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_INVALID_PUMP_ID); return; } if (p["enable"].as()) { int t = 30; if (p["timeout"].as() < MAX_PUMP_TIMEOUT) t = p["timeout"].as(); if ((gpioa_state >> pump_id) & 1) { client.publish(TOPIC_LOG, "WARNING: Selected pump is already on"); client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_PUMP_ALREADY_ON); return; } if (active_bits(gpioa_state) > MAX_ACTIVE_PUMPS) { client.publish(TOPIC_LOG, "ERROR: Too many pumps are already active"); client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_TOO_MANY_PUMPS); return; } gpio_expander_set_pin(pump_id); timeouts[pump_id] = timeClient.getEpochTime() + t; // Log operation char log[50]; sprintf(log, "INFO: Turned on pump %d for %d seconds", pump_id + 1, t); client.publish(TOPIC_LOG, log); client.publish(TOPIC_COMMAND_RESULT, CMD_OK); } else { if (!((gpioa_state >> pump_id) & 1)) { client.publish(TOPIC_LOG, "WARNING: Selected pump is already off"); return; } gpio_expander_clear_pin(pump_id); // Log operation char log[50]; sprintf(log, "INFO: Turned off pump %d", pump_id + 1); client.publish(TOPIC_LOG, log); client.publish(TOPIC_COMMAND_RESULT, CMD_OK); } } else { client.publish(TOPIC_LOG, "ERROR: Invalid JSON"); client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_INVALID_JSON); } } else if (!strcmp(topic, TOPIC_SENSOR_BURST)){ if (length > 4) { client.publish(TOPIC_LOG, "ERROR: Invalid sensor burst iteration count. Max iterations: 9999"); client.publish(TOPIC_COMMAND_RESULT, BURST_CMD_TOO_MANY_ITERATIONS); return; } char pstr[5]; uint8_t i; for (i = 0; i < length; i++) { pstr[i] = (char)payload[i]; } pstr[i] = 0; sensor_burst = atoi((char*)pstr); client.publish(TOPIC_COMMAND_RESULT, CMD_OK); } else if (!strcmp(topic, TOPIC_CALIBRATION_RESET)) { if (length > 1) { client.publish(TOPIC_LOG, "ERROR: Only signle-byte addresses allowed"); client.publish(TOPIC_COMMAND_RESULT, CALRST_CMD_INVALID_SENSOR_ID); return; } uint8_t i = (uint8_t)payload[0] - SENSOR_ADDR_START; sensor_max[i] = 0; sensor_min[i] = RS485_MAX_INT; char m[50]; sprintf(m, "Cal of sensor at %X reset", i+SENSOR_ADDR_START); client.publish(TOPIC_LOG, m); client.publish(TOPIC_COMMAND_RESULT, CMD_OK); } else { // Received a message on a topic we didn't subscribe to hmm } } void setup() { for (int i = 0; i < SENSORS_COUNT; i++) { sensor_max[i] = 0; sensor_min[i] = RS485_MAX_INT; } Serial.begin(115200); WiFi.begin(ssid, password); Wire.begin(); while (WiFi.status() != WL_CONNECTED) { delay(500); // Serial.print("Connecting to WiFi.."); } // Serial.println("Connected to the WiFi network"); client.setServer(mqttServer, mqttPort); client.setCallback(recv_callback); while (!client.connected()) { // Serial.println("Connecting to MQTT..."); if (client.connect("autowaterer")) { // Serial.println("connected"); } else { // Serial.print("failed with state "); // Serial.print(client.state()); delay(2000); } } gpio_expander_setup(); // Publish and subscribe examples // client.publish("esp/test", "Hello from ESP8266"); client.subscribe(TOPIC_PUMP_COMMAND); client.subscribe(TOPIC_SENSOR_BURST); client.subscribe(TOPIC_CALIBRATION_RESET); timeClient.begin(); } void loop() { client.loop(); // Should be called regularly timeClient.update(); unsigned long now = timeClient.getEpochTime(); for (int i = 0; i < PUMPS_COUNT; i++) { if ((gpioa_state >> i) & 1) { if (timeouts[i] < now) { gpio_expander_clear_pin(i); char log[50]; sprintf(log, "INFO: Turned off pump %d", i + 1); client.publish(TOPIC_LOG, log); } } } if (sensor_burst > 0) { // Update sensors every 200ms, sensor_burst times sensor_burst--; update_sensors(); } else if (now - last_sensor_update > SENSORS_UPDATE_INTERVAL) { last_sensor_update = now; update_sensors(); } delay(200); }