You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
9.9 KiB

#include <main.h>
#include <string.h>
#include <Arduino.h>
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <Wire.h> // I2C library. Who thought "Wire" was a good name for it?
#include <WiFiUdp.h>
#include <NTPClient.h>
const char* ssid = "xxx";
const char* password = "xxx";
const char* mqttServer = "xxx";
const int mqttPort = 1883;
WiFiClient espClient;
PubSubClient client(espClient);
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, NTP_ADDRESS, NTP_OFFSET, NTP_INTERVAL);
uint8_t gpioa_state = 0;
unsigned long timeouts[PUMPS_COUNT];
unsigned long last_sensor_update = 0;
uint16_t sensor_max[SENSORS_COUNT];
uint16_t sensor_min[SENSORS_COUNT];
uint16_t sensor_burst = 0;
uint8_t active_bits(uint8_t v) {
uint8_t c;
for (c = 0; v; v >>= 1)
c += v & 1;
return v;
}
void adjust_calibration(uint16_t v, uint8_t i) {
if (v < sensor_min[i])
sensor_min[i] = v;
if (v > sensor_max[i])
sensor_max[i] = v;
}
void publish_availability(uint8_t address, uint8_t available) {
char t[60];
sprintf(t, "%s%c/available", TOPIC_MOISTURE_SENSORS, address);
if (available)
client.publish(t, "online");
else
client.publish(t, "offline");
}
void publish_sensor_value(uint8_t address, uint16_t v) {
char vstr[10];
char t[60];
sprintf(t, "%s%c/value", TOPIC_MOISTURE_SENSORS, address);
sprintf(vstr, "%d", v);
client.publish(t, vstr);
}
uint16_t read_sensor(uint8_t address) {
// Transmit address
digitalWrite(RS485_OUTPUT_ENABLE, HIGH);
Serial.write(address);
Serial.flush();
digitalWrite(RS485_OUTPUT_ENABLE, LOW);
Serial.read(); // Receiver is always enabled, so the first read byte is the address we just sent
uint8_t value[2];
size_t l = Serial.readBytes(value, 2); // Blocking read of 2 bytes. Timeout defaults to 1000 ms. Returns number of bytes read
if (l < 2)
return RS485_ERROR_TIMEOUT; // This value (0xFFFF) can never be the ADC result, so it will be our error code
// Reconstruct the read value
return ((uint16_t)value[0] << 8) | value[1];
}
void update_sensors() {
uint16_t v;
// Make sure the uart input buffer is empty
while(Serial.available() > 0)
Serial.read();
for (uint8_t i = 0; i < SENSORS_COUNT; i++) {
uint8_t address = (uint8_t)SENSOR_ADDR_START + i;
v = read_sensor(address);
if (v <= RS485_MAX_INT) {
adjust_calibration(v, i);
v = map(v, sensor_min[i], sensor_max[i], 0, 100);
publish_availability(address, ONLINE);
publish_sensor_value(address, v);
} else {
publish_availability(address, OFFLINE);
}
}
}
void gpio_expander_setup() { // Set PORTA channels to output
Wire.beginTransmission(GPIO_EXPANDER_ADDR);
Wire.write(GPIO_EXPANDER_DIR_A);
Wire.write(0x00); // All ouptuts
Wire.endTransmission();
}
void gpio_expander_set_pin(int pin) {
if (pin > 7)
return;
gpioa_state |= (1 << pin);
Wire.beginTransmission(GPIO_EXPANDER_ADDR);
Wire.write(GPIO_EXPANDER_PORT_A);
Wire.write(gpioa_state);
uint8_t i2c_error = Wire.endTransmission();
if (i2c_error != I2C_SUCCESS) {
if (i2c_error == I2C_ADDRESS_NACK) {
char m[50];
sprintf(m, "ERROR: No device at address %X", GPIO_EXPANDER_ADDR);
client.publish(TOPIC_LOG, m);
} else if (i2c_error == I2C_DATA_NACK) {
client.publish(TOPIC_LOG, "ERROR: Invalid data on I2C bus");
}
}
}
void gpio_expander_clear_pin(int pin) {
if (pin > 7)
return;
gpioa_state &= ~(1 << pin);
Wire.beginTransmission(GPIO_EXPANDER_ADDR);
Wire.write(GPIO_EXPANDER_PORT_A);
Wire.write(gpioa_state);
uint8_t i2c_error = Wire.endTransmission();
if (i2c_error != I2C_SUCCESS) {
if (i2c_error == I2C_ADDRESS_NACK) {
char m[50];
sprintf(m, "ERROR: No device at address %X", GPIO_EXPANDER_ADDR);
client.publish(TOPIC_LOG, m);
} else if (i2c_error == I2C_DATA_NACK) {
client.publish(TOPIC_LOG, "ERROR: Invalid data on I2C bus");
}
}
}
void recv_callback(char* topic, byte* payload, unsigned int length) {
if (!strcmp(topic, TOPIC_PUMP_COMMAND)) { // strcmp returns 0 if the strings are equal
// The payload is in the form {"id":<id>;"enable":<enable>;"timeout":<timeout>}
const int capacity = JSON_OBJECT_SIZE(3);
StaticJsonDocument<capacity> p;
DeserializationError err = deserializeJson(p, payload);
if (err == DeserializationError::Ok) {
uint8_t pump_id = p["id"].as<int>();
if (pump_id > PUMPS_COUNT - 1) {
client.publish(TOPIC_LOG, "ERROR: Invalid pump id");
client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_INVALID_PUMP_ID);
return;
}
if (p["enable"].as<int>()) {
int t = 30;
if (p["timeout"].as<int>() < MAX_PUMP_TIMEOUT)
t = p["timeout"].as<int>();
if ((gpioa_state >> pump_id) & 1) {
client.publish(TOPIC_LOG, "WARNING: Selected pump is already on");
client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_PUMP_ALREADY_ON);
return;
}
if (active_bits(gpioa_state) > MAX_ACTIVE_PUMPS) {
client.publish(TOPIC_LOG, "ERROR: Too many pumps are already active");
client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_TOO_MANY_PUMPS);
return;
}
gpio_expander_set_pin(pump_id);
timeouts[pump_id] = timeClient.getEpochTime() + t;
// Log operation
char log[50];
sprintf(log, "INFO: Turned on pump %d for %d seconds", pump_id + 1, t);
client.publish(TOPIC_LOG, log);
client.publish(TOPIC_COMMAND_RESULT, CMD_OK);
} else {
if (!((gpioa_state >> pump_id) & 1)) {
client.publish(TOPIC_LOG, "WARNING: Selected pump is already off");
return;
}
gpio_expander_clear_pin(pump_id);
// Log operation
char log[50];
sprintf(log, "INFO: Turned off pump %d", pump_id + 1);
client.publish(TOPIC_LOG, log);
client.publish(TOPIC_COMMAND_RESULT, CMD_OK);
}
} else {
client.publish(TOPIC_LOG, "ERROR: Invalid JSON");
client.publish(TOPIC_COMMAND_RESULT, PUMP_CMD_ERR_INVALID_JSON);
}
} else if (!strcmp(topic, TOPIC_SENSOR_BURST)){
if (length > 4) {
client.publish(TOPIC_LOG, "ERROR: Invalid sensor burst iteration count. Max iterations: 9999");
client.publish(TOPIC_COMMAND_RESULT, BURST_CMD_TOO_MANY_ITERATIONS);
return;
}
char pstr[5];
uint8_t i;
for (i = 0; i < length; i++) {
pstr[i] = (char)payload[i];
}
pstr[i] = 0;
sensor_burst = atoi((char*)pstr);
client.publish(TOPIC_COMMAND_RESULT, CMD_OK);
} else if (!strcmp(topic, TOPIC_CALIBRATION_RESET)) {
if (length > 1) {
client.publish(TOPIC_LOG, "ERROR: Only signle-byte addresses allowed");
client.publish(TOPIC_COMMAND_RESULT, CALRST_CMD_INVALID_SENSOR_ID);
return;
}
uint8_t i = (uint8_t)payload[0] - SENSOR_ADDR_START;
sensor_max[i] = 0;
sensor_min[i] = RS485_MAX_INT;
char m[50];
sprintf(m, "Cal of sensor at %X reset", i+SENSOR_ADDR_START);
client.publish(TOPIC_LOG, m);
client.publish(TOPIC_COMMAND_RESULT, CMD_OK);
} else {
// Received a message on a topic we didn't subscribe to hmm
}
}
void setup() {
for (int i = 0; i < SENSORS_COUNT; i++) {
sensor_max[i] = 0;
sensor_min[i] = RS485_MAX_INT;
}
Serial.begin(115200);
WiFi.begin(ssid, password);
Wire.begin();
while (WiFi.status() != WL_CONNECTED) {
delay(500);
// Serial.print("Connecting to WiFi..");
}
// Serial.println("Connected to the WiFi network");
client.setServer(mqttServer, mqttPort);
client.setCallback(recv_callback);
while (!client.connected()) {
// Serial.println("Connecting to MQTT...");
if (client.connect("autowaterer")) {
// Serial.println("connected");
} else {
// Serial.print("failed with state ");
// Serial.print(client.state());
delay(2000);
}
}
gpio_expander_setup();
// Publish and subscribe examples
// client.publish("esp/test", "Hello from ESP8266");
client.subscribe(TOPIC_PUMP_COMMAND);
client.subscribe(TOPIC_SENSOR_BURST);
client.subscribe(TOPIC_CALIBRATION_RESET);
timeClient.begin();
}
void loop() {
client.loop(); // Should be called regularly
timeClient.update();
unsigned long now = timeClient.getEpochTime();
for (int i = 0; i < PUMPS_COUNT; i++) {
if ((gpioa_state >> i) & 1) {
if (timeouts[i] < now) {
gpio_expander_clear_pin(i);
char log[50];
sprintf(log, "INFO: Turned off pump %d", i + 1);
client.publish(TOPIC_LOG, log);
}
}
}
if (sensor_burst > 0) { // Update sensors every 200ms, sensor_burst times
sensor_burst--;
update_sensors();
} else if (now - last_sensor_update > SENSORS_UPDATE_INTERVAL) {
last_sensor_update = now;
update_sensors();
}
delay(200);
}