Working from How to Transform a RaspberryPi Into a Universal Zigbee and Z-Wave Bridge since RaspberryPi is Debian based. (I am running Debian 10 on a Mac Mini 3,1)
Bought a cheapo CC2531
USB ZigBee thing pre-flashed and plugged it in and it showed up.
# tail /var/log/syslog Jan 25 12:58:40 mini31 kernel: [4555605.979593] usb 3-3: new full-speed USB device number 5 using ohci-pci Jan 25 12:58:40 mini31 kernel: [4555606.228490] usb 3-3: New USB device found, idVendor=0451, idProduct=16a8, bcdDevice= 0.09 Jan 25 12:58:40 mini31 kernel: [4555606.228494] usb 3-3: New USB device strings: Mfr=1, Product=2, SerialNumber=3 Jan 25 12:58:40 mini31 kernel: [4555606.228497] usb 3-3: Product: TI CC2531 USB CDC Jan 25 12:58:40 mini31 kernel: [4555606.228499] usb 3-3: Manufacturer: Texas Instruments Jan 25 12:58:40 mini31 kernel: [4555606.228501] usb 3-3: SerialNumber: __0X00124B0009EBD991 Jan 25 12:58:40 mini31 kernel: [4555606.249872] cdc_acm 3-3:1.0: ttyACM0: USB ACM device Jan 25 12:58:40 mini31 kernel: [4555606.252384] usbcore: registered new interface driver cdc_acm Jan 25 12:58:40 mini31 kernel: [4555606.252386] cdc_acm: USB Abstract Control Model driver for USB modems and ISDN adapters # lsusb | grep Texas Bus 003 Device 005: ID 0451:16a8 Texas Instruments, Inc. #
Note that /dev/ttyACM0
has appeared.
Then copied and pasted as root
, as you do...
# apt-get install mosquitto # systemctl start mosquitto.service # systemctl enable mosquitto.service
Is anything happening?
# screen -S mosquitto_sub mosquitto_sub -h localhost -p 1883 -t zigbee2mqtt/Home/# -F "%I %t %p"
At this point I could write a simple C# programme to see MQTT working.
using MQTTnet; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Formatter; using MQTTnet.Protocol; namespace ConsoleAppMqtt { public static class Program { private static MqttFactory _F; private static IManagedMqttClient _Cpublisher; private static IManagedMqttClient _CsubscriberOdd; private static IManagedMqttClient _CsubscriberEven; private static IManagedMqttClient _Sys; private static MqttClientTlsOptions MqttClientTlsOptions = new MqttClientTlsOptions { UseTls = false, IgnoreCertificateChainErrors = true, IgnoreCertificateRevocationErrors = true, AllowUntrustedCertificates = true }; private static MqttClientOptions GetOptions(string clientName) { return new MqttClientOptions { ClientId = clientName, ProtocolVersion = MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = "192.168.1.31", Port = 1883, TlsOptions = MqttClientTlsOptions }, CleanSession = true, // Don't store up messages while we're offline. KeepAlivePeriod = TimeSpan.FromSeconds(5) }; } public static void Main(string[] args) { _F = new MqttFactory(); // Publisher _Cpublisher = _F.CreateManagedMqttClient(); _Cpublisher.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _Cpublisher.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); _Cpublisher.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); _Cpublisher.ApplicationMessageProcessedHandler = new ApplicationMessageProcessedHandlerDelegate(MessageProcessedHandler); _Cpublisher.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientPublisher") }).Wait(); // Odd subscriber _CsubscriberOdd = _F.CreateManagedMqttClient(); _CsubscriberOdd.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _CsubscriberOdd.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); _CsubscriberOdd.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); _CsubscriberOdd.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerO); _CsubscriberOdd.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientOdd") }).Wait(); _CsubscriberOdd.SubscribeAsync(new MqttTopicFilter { Topic = "numbers/odd" }).Wait(); // Even subscriber. _CsubscriberEven = _F.CreateManagedMqttClient(); _CsubscriberEven.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _CsubscriberEven.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); _CsubscriberEven.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); _CsubscriberEven.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerE); _CsubscriberEven.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientEven") }).Wait(); _CsubscriberEven.SubscribeAsync(new MqttTopicFilter { Topic = "numbers/even" }).Wait(); // SYS subscriber. _Sys = _F.CreateManagedMqttClient(); _Sys.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage); _Sys.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); _Sys.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); _Sys.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerS); _Sys.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientSYS") }).Wait(); //_Sys.SubscribeAsync(new MqttTopicFilter { Topic = "$SYS/#" }).Wait(); while (!_Cpublisher.IsConnected || !_CsubscriberOdd.IsConnected || !_CsubscriberEven.IsConnected || !_Sys.IsConnected) { Console.WriteLine("Waiting for connections"); Thread.Sleep(1000); } // Send some messages. for (int i = 1; i <= 13; i++) { MqttApplicationMessage? message = new MqttApplicationMessageBuilder() .WithTopic("numbers/" + (i % 2 == 0 ? "even" : "odd")) .WithPayload($"Message {i}.") .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce) .WithRetainFlag() // This means that the last message (12, 13) will be replayed immediately the next time we subscribe. .Build(); Console.WriteLine("-------- " + i + " --------"); _Cpublisher.PublishAsync(message).Wait(); Console.WriteLine("published"); Thread.Sleep(5000); } Thread.Sleep(1000); while (_Cpublisher.PendingApplicationMessagesCount > 0 || _CsubscriberOdd.PendingApplicationMessagesCount > 0 || _CsubscriberEven.PendingApplicationMessagesCount > 0) { Console.WriteLine("Waiting for PendingApplicationMessagesCount"); Thread.Sleep(1000); } // Stop. StopAsync().Wait(); Console.WriteLine("Done. Pressto quit."); Console.ReadLine(); } private static async Task StopAsync() { await _Cpublisher.StopAsync(); await _CsubscriberOdd.StopAsync(); await _CsubscriberEven.StopAsync(); await _Sys.StopAsync(); } private static void MessageProcessedHandler(ApplicationMessageProcessedEventArgs args) { // This appears to be the callback for when the publisher has received successfully. Console.WriteLine("MessageProcessedHandler (P): " + args.ApplicationMessage.ApplicationMessage.ConvertPayloadToString()); } private static void HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs args) { // This is never called. string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}"; Console.WriteLine("HandleReceivedApplicationMessage: " + item); } private static void SubscriberMessageReceivedHandlerO(MqttApplicationMessageReceivedEventArgs args) { string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}"; Console.WriteLine("SubscriberMessageReceivedHandlerO: " + item); } private static void SubscriberMessageReceivedHandlerE(MqttApplicationMessageReceivedEventArgs args) { string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}"; Console.WriteLine("SubscriberMessageReceivedHandlerE: " + item); } private static void SubscriberMessageReceivedHandlerS(MqttApplicationMessageReceivedEventArgs args) { string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}"; Console.WriteLine("SubscriberMessageReceivedHandlerS: " + item); } private static void OnConnected(MqttClientConnectedEventArgs args) { Console.WriteLine("Connected"); } private static void OnDisconnected(MqttClientDisconnectedEventArgs args) { Console.WriteLine("Disconnected"); } } }
Continuing... (Note the addition of npm
.)
# apt-get install nodejs npm git make g++ gcc
Not so fast! The Debian package for node is version 10 which is defunct, and npm install
will
complain about this.
Therefore it’s necessary to install a newer verson — 16 —
from NodeSource.
I actually got it to run under node 10
but it was a processor hog and had my CPU running 20 degrees
above what is normal. Upgrading to node 16
seems to have fixed this.
# cd # curl -fsSL https://deb.nodesource.com/setup_lts.x | bash - # apt-get install -y nodejs # node --version v16.13.2
The package is version 18 but current is 22.
zigbee2mqtt
version 2 is a complete fuckup — it does not work
with any USB adaptor unless you flash the firmware on the adaptor and fuck knows
how to do that. Therefore use version 1.42.0.
sudo mkdir /opt/zigbee2mqtt142 sudo chown yourself:yourself /opt/zigbee2mqtt142 git clone --depth 1 https://github.com/Koenkk/zigbee2mqtt.git /opt/zigbee2mqtt142 cd /opt/zigbee2mqtt142 git fetch --all --tags git checkout tags/1.42.0 npm install npm start # npm install
For /etc/systemd/system/zigbee2mqtt.service
:
[Unit] Description=zigbee2mqtt After=network.target [Service] ExecStart=/usr/bin/npm start WorkingDirectory=/opt/zigbee2mqtt StandardOutput=inherit StandardError=inherit Restart=always [Install] WantedBy=multi-user.target
...and then...
# mv /root/zigbee2mqtt /opt/zigbee2mqtt # systemctl daemon-reload # systemctl start zigbee2mqtt.service # systemctl enable zigbee2mqtt.service
If it complains that the port is in use the try removing and re-inserting the USB adapter; I seemed to have got an
instance of zigbee2mqtt
running already somehow and removing the hardware killed it.
If it simply doesn’t work then disable the service and run it on a screen at startup instead.
Use cronspec and this script.
#!/bin/sh s=$( screen -ls | grep zigbee2mqtt | wc -l ) if [ ! $s -gt 0 ]; then logger -s "zigbee2mqtt is already running on screen." exit 0 fi logger -s "zigbee2mqtt is starting on screen." /usr/bin/screen -dm -S zigbee2mqtt "cd /opt/zigbee2matt142; npm start;"
Don’t bother; zigbee2mqtt-frontend
seems to do the same and is
already installed with zigbee2mqtt
# apt-get install redis-server # systemctl start redis-server # systemctl enable redis-server # apt install python-pip # cd # pip3 install 'platypush[zigbee,http,mqtt]' # touch /etc/platypush/config.yaml # touch /etc/systemd/system/platypush.service # systemctl daemon-reload # systemctl start platypush.service # systemctl enable platypush.service
And now I have something running at http://192.168.1.31:8008
.
Github page for zigbee2mqtt-frontend unfortunately there is basically no documentation; they assume that you’re already a developer of it — very fraustrating
# git clone https://github.com/nurikk/zigbee2mqtt-frontend.git # cd zigbee2mqtt-frontend
Everyone wants dashboards; the solution appears to be influxDB and Grafana. Platypush appears to be useless.
Install influxdb
and telegraf
(copied from Influxdb download page).
# wget -qO- https://repos.influxdata.com/influxdb.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdb.gpg > /dev/null # export DISTRIB_ID=$(lsb_release -si); export DISTRIB_CODENAME=$(lsb_release -sc) # echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdb.gpg] https://repos.influxdata.com/${DISTRIB_ID,,} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list > /dev/null # apt-get update # apt-get install influxdb2 # systemctl unmask influxdb # systemctl start
Note that package influx-client
is for v1 and is incompatible with influxdb2
.
Furthermore the influx
cli for v2 has changed completely and is now useless.
Did it work?
# influx ping OK
The influx
command appears not to be able to do anything more.
A website has appeared at http://localhost:8086
!
It appears to have dashboards so maybe Grafana won’t be needed.
Now set up telegraf
to get data from mqtt
into influxdb
# wget https://dl.influxdata.com/telegraf/releases/telegraf_1.21.4-1_amd64.deb # dpkg -i telegraf_1.21.4-1_amd64.deb # telegraf --sample-config > telegraf.conf # systemctl start telegraf
The default telegraf.config
is set up to collect statistics (disc, cpu,...) about the machine it’s running on
so this needs commenting out.
Next, append to
/etc/telegraf/telegraf.conf
the following:
# Consumer for the temperature sensors. [[inputs.mqtt_consumer]] servers = ["tcp://127.0.0.1:1883"] topics = [ "zigbee2mqtt/Home/+/Temp",, "zigbee2mqtt/Home/+/Radiator/+" ] data_format = "json_v2" [[inputs.mqtt_consumer.json_v2]] measurement_name = "temperature" [[inputs.mqtt_consumer.topic_parsing]] topic = "zigbee2mqtt/Home/+/Temp" tags = "_/_/room/_" [[inputs.mqtt_consumer.json_v2.field]] path = "temperature" type = "float" # Consumer for the TRVs (which have a different JSON format). [[inputs.mqtt_consumer]] servers = ["tcp://127.0.0.1:1883"] topics = [ "zigbee2mqtt/Home/+/Radiator", ] data_format = "json_v2" [[inputs.mqtt_consumer.json_v2]] measurement_name = "temperature" [[inputs.mqtt_consumer.topic_parsing]] topic = "zigbee2mqtt/Home/+/Radiator" tags = "_/_/room/_" [[inputs.mqtt_consumer.topic_parsing]] topic = "zigbee2mqtt/Home/+/Radiator/+" tags = "_/_/room/_/position" [[inputs.mqtt_consumer.json_v2.field]] path = "local_temperature" rename = "temperature" type = "float" [[inputs.mqtt_consumer.json_v2]] measurement_name = "valve" [[inputs.mqtt_consumer.topic_parsing]] topic = "zigbee2mqtt/Home/+/Radiator" tags = "_/_/room/_" [[inputs.mqtt_consumer.topic_parsing]] topic = "zigbee2mqtt/Home/+/Radiator/+" tags = "_/_/room/_/position" [[inputs.mqtt_consumer.json_v2.field]] path = "position" rename = "valve" type = "float" [[outputs.influxdb_v2]] urls = ["http://127.0.0.1:8086"] token = "token" # Create in the InfluxDB UI. Will be needed for command line too. organization = "mini31" bucket = "mini31"
The radiator TRV is a Moes BRT-100-TRV
and whenever I set the day and time it reverts to an apparently random value. It is therefore useless.
This turned out to be a bug in zigbee2mqtt
that appears to have been fixed by version 1.27.
Here is a simple dashboard in the influxdb website. The gauges are set to aggregation latest
so they’re
effectively independent of the time period
The settings debug = true
and quiet = false
in the [agent]
section
turn on verbose logging.
I had lots of useless log spam from the [[inputs.disk]]
thing that comes out-of-the-box.
The error message contained paths that I could see in /proc/mounts
and adding
tmpfs
and ramfs
to the ignore_fs
parameter of [[inputs.disk]]
got them to fuck off.
Just set the retention policy on the bucket.
# influx config create --config-name mini31 --host-url http://127.0.0.1:8086 --org mini31 --token MyTopSecretToken --active # influx config list
This creates a file ~/.influxdbv2/configs
which contains the token.
# du -sh /var/lib/influxdb/ 290M /var/lib/influxdb/ # influx delete --org mini31 --bucket zigbee --start 2020-01-01T00:00:00.000Z --stop $(date --date '6 months ago' +"%Y-%m-%dT%H:%M:%SZ") # du -sh /var/lib/influxdb/ 290M /var/lib/influxdb/
Oh.
I want to get a text message telling me to open or close the windows whenever the outside temperature crosses the inside temperature.
The first task is to create a task to compute a new measure to flag this
import "join" option task = { name: "window_indicator_task", every: 13m, offset: 3m, } // Get outside data. o = from(bucket: "zigbee") |> range(start: -6h, stop: now()) //|> range(start: -6mo, stop: now()) |> filter(fn: (r) => r["room"] == "Outside") |> filter(fn: (r) => r["_measurement"] == "temperature") |> aggregateWindow(every: 30m, fn: mean, createEmpty: false) |> keep(columns: ["_time", "_value"]) os = o |> timeShift(duration: 30m) // Yes, plus. // Join. // join.time doesn't work becauase InfluxDB is a bug-ridden piece of shit. oj = join.inner( left: o, right: os, on: (l, r) => l._time == r._time, as: (l, r) => ({ _time: r._time, _value_o: l._value, _value_os: r._value, }) ) // Get inside data. i = from(bucket: "zigbee") |> range(start: -6h, stop: now()) |> filter(fn: (r) => (r["room"] == "Bed3" or r["room"] == "Bed1") and r["_measurement"] == "temperature") |> drop(columns: ["room"]) |> group() |> aggregateWindow(every: 30m, fn: mean, createEmpty: false) j = join.inner( // This also doesn't work because // panic: runtime error: invalid memory address or nil pointer dereference // Fucking piece of shit. left: oj, right: i, on: (l, r) => l._time == r._time, as: (l, r) => ({ _time: l._time, o: l._value_o, os: l._value_os, i: r._value, }) ) |> map( fn: (r) => ({r with d: if float(v: r.o) > 19 and float(v: r.o) < float(v: r.i) and float(v: r.os) < float(v: r.i) then // It's hot outside and outside is cooler but earlier it was hotter: open the windows. 1 else if float(v: r.o) > 19 and float(v: r.o) < float(v: r.i) and float(v: r.os) > float(v: r.i) then // It's hot outside, and outside is hotter than inside but earlier it was cooler: close the windows. -1 else 0, s: if float(v: r.i) > 19 and float(v: r.o) < float(v: r.i) then // If it's hot inside and outside is cooler then the windows should be open. "open" else "closed", }), ) // Output for task j |> map( fn: (r) => ({ _time: r._time, _measurement: "window_indicator", _field: "window_indicator", _value: r.d, }), ) |> to(bucket: "zigbee", org: "mini31") j |> map( fn: (r) => ({ _time: r._time, _measurement: "window_status", _field: "window_status", _value: r.s, }), ) |> to(bucket: "zigbee", org: "mini31")
Note that the query language is InfluxDB, and not TICKscript
— which looks similar.
The next task is to use the new computed measure to create check on the alerts page. When you use the UI to create a check it creates a task behind the scenes which doesn’ show on the tasks page. This is particularly irritating because htis means that you can’t click to see the history and logs of this alert task — you have to click to the history of a real task then paste the ID of the alert task into the address bar.
My window_indicator
is ±1 when something needs to be done else zero.
The alert notification enpoints in InfluxDB are complete shit; the workaround is to have nc
receive the HTML POSTs and then process them.
This script logs POST bodies to file, and scrapes the content to do the real message sending.
#!/bin/sh file="/root/nc/in.txt" date +"%Y-%m-%d %H:%M" >> "$file" while IFS= read -r line || [ ! -z "$line" ]; do echo "$line" >> "$file" wi=$(echo "$line" | sed -nE 's/.*"window_indicator":[[:space:]]*([-0-9.]+).*/\1/p') if [ "$wi" = "1" ]; then wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Open the windows."}' --header "Authorization: fd1176dc" -q -O/dev/null elif [ "$wi" = "-1" ]; then wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Close the windows."}' --header "Authorization: fd1176dc" -q -O/dev/null elif [! -z "$wi" ] && [ ! "$wi" = "0" ]; then wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Unknown window indicator value: $wi."}' --header "Authorization: fd1176dc" -q -O/dev/null fi done; echo "--------------------------------------------------------------------------------" >> "$file
Run
# while true; do cat 201.txt | nc -l -p 8087 -q 1 | /root/nc/nc.sh; done
where 201.txt
is:
HTTP/1.1 201 Created Server: netcat@mini31 Content-Type: text/plain; charset=UTF-8 Created
and configure InfluxDB to post everything to http://127.0.0.1:8087
Finally, create a notification rule — this also has the secret task with inaccessile logs problem.
Some care may be necessary in choosing timings:
The while
loop can be made into a systemd
service.
/etc/systemd/system/influxdb-nc.service
:
[Service] Type=simple ExecStart=/root/nc/nc-run.sh PIDFile=/root/nc/systemd-InfouxDB-nc-service-pid Restart=always RestartSec=0 [Unit] Description=nc on port 8087 for InfluxDB HTTP notifications After=network.target Wants=network.target [Install] WantedBy=multi-user.target
And
# systemctl daemon-reload # systemctl enable influxdb-nc # systemctl start influxdb-nc