linky2influx/main.go

315 lines
6.1 KiB
Go
Raw Normal View History

2021-02-03 22:26:03 +00:00
package main
import (
2021-02-03 23:50:39 +00:00
"bytes"
2021-02-04 02:31:27 +00:00
"context"
2021-02-03 22:26:03 +00:00
"flag"
2021-02-03 23:58:03 +00:00
"fmt"
2021-02-03 23:50:39 +00:00
"io"
2021-02-03 22:26:03 +00:00
"log"
"os"
"os/signal"
2021-02-04 02:31:27 +00:00
"strconv"
2021-02-03 22:26:03 +00:00
"syscall"
"time"
2021-02-03 23:50:39 +00:00
"unicode"
2021-02-03 22:26:03 +00:00
2021-02-04 02:31:27 +00:00
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
2021-02-03 22:26:03 +00:00
"github.com/tarm/serial"
)
2021-02-04 02:31:27 +00:00
var myZone = ""
2021-02-03 23:50:39 +00:00
func readSerial(s *serial.Port, c chan []byte) {
var unread bytes.Buffer
buf := make([]byte, 128)
2021-02-03 22:26:03 +00:00
2021-02-03 23:50:39 +00:00
for {
n, err := s.Read(buf)
if err != nil {
log.Fatal(err)
}
unread.Write(buf[:n])
for {
2021-02-03 23:58:03 +00:00
line, err := unread.ReadBytes(2)
2021-02-03 23:50:39 +00:00
if err == io.EOF {
if _, err = unread.Write(line); err != nil {
log.Println(err)
}
break
} else if err != nil {
log.Println(err)
break
}
2021-02-03 23:58:03 +00:00
c <- bytes.TrimRightFunc(line, unicode.IsControl)
}
}
}
2021-02-04 02:31:27 +00:00
func defaultTags() map[string]string {
ret := map[string]string{}
if myZone != "" {
ret["zone"] = myZone
}
if host, err := os.Hostname(); err == nil {
ret["host"] = host
}
return ret
}
func fillUnit(tags *map[string]string, key string) {
if v, ok := MeasurementUnits[key]; ok {
(*tags)["unit"] = v
}
}
func getValue(key string, data []byte) interface{} {
if _, ok := MeasurementUnits[key]; ok {
ret, _ := strconv.Atoi(string(data))
return ret
} else {
return string(data)
}
}
func genMeasurements(m map[string][]byte, horodate time.Time) (points []*write.Point) {
measurmentdone := []string{}
// Treat MeasurementGroups
for kgrp, grp := range MeasurementGroups {
fields := map[string]interface{}{}
for _, k := range grp {
measurmentdone = append(measurmentdone, k)
if v, ok := m[k]; ok {
fields[k] = getValue(k, v)
}
}
if len(fields) == 0 {
continue
}
tags := defaultTags()
fillUnit(&tags, grp[0])
points = append(points, influxdb2.NewPoint(kgrp,
tags,
fields,
horodate,
))
}
// Treat int
munits:
for k := range MeasurementUnits {
if _, ok := m[k]; !ok {
continue
}
for _, mg := range measurmentdone {
if mg == k {
continue munits
}
}
tags := defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
horodate,
))
}
// Treat string
for _, k := range MeasurementStrings {
if _, ok := m[k]; !ok {
continue
}
tags := defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
horodate,
))
}
return
}
2022-07-29 10:22:37 +00:00
func treatFrames(frames chan []byte, client influxdb2.Client, org string, bucket string) {
2021-02-03 23:58:03 +00:00
first := true
2022-07-29 10:22:37 +00:00
writeAPI := client.WriteAPIBlocking(org, bucket)
2021-02-10 01:21:15 +00:00
fframe:
2021-02-03 23:58:03 +00:00
for {
frame := <-frames
// Skip the first frame because it's not complete
if first {
first = false
continue
}
2021-02-04 02:31:27 +00:00
points := []*write.Point{}
var defaultHorodate time.Time
m := map[string][]byte{}
2021-02-03 23:58:03 +00:00
for _, line := range bytes.Split(frame, []byte("\r\n")) {
key, horodate, data, err := treatLine(line)
if err != nil {
log.Println(err)
continue
}
2021-02-04 02:31:27 +00:00
// Skip ADCO, this is the Linky address, confidential and unrelevant
if key == "ADCO" {
continue
}
if key == "DATE" {
2021-02-10 01:21:15 +00:00
if horodate == nil {
continue fframe
}
2021-02-04 02:31:27 +00:00
defaultHorodate = *horodate
continue
}
if horodate == nil {
m[key] = data
} else {
tags := defaultTags()
fillUnit(&tags, key)
points = append(points, influxdb2.NewPoint(key,
tags,
map[string]interface{}{"value": getValue(key, data)},
*horodate,
))
}
}
points = append(points, genMeasurements(m, defaultHorodate)...)
err := writeAPI.WritePoint(context.Background(), points...)
if err != nil {
log.Println("Unable to write points:", err)
2021-02-03 23:50:39 +00:00
}
}
}
func computeChecksum(area []byte) (checksum byte) {
for _, c := range area {
checksum += c
}
checksum = (checksum & 0x3F) + 0x20
return
}
2021-02-03 23:58:03 +00:00
func getHorodate(fields *[][]byte) (*time.Time, error) {
if (len(*fields) == 4 || string((*fields)[0]) == "DATE") && len((*fields)[1]) == 13 {
horodate, err := time.Parse("060102150405", string((*fields)[1][1:]))
2021-02-03 23:51:04 +00:00
if err != nil {
2021-02-03 23:58:03 +00:00
return nil, err
2021-02-03 23:51:04 +00:00
}
// Handle "saison"
if (*fields)[1][0] == 'E' || (*fields)[1][0] == 'e' {
2021-02-04 02:31:27 +00:00
horodate = horodate.Add(-2 * time.Hour)
2021-02-03 23:51:04 +00:00
} else {
2021-02-04 02:31:27 +00:00
horodate = horodate.Add(-1 * time.Hour)
2021-02-03 23:51:04 +00:00
}
// Mark field as treated
*fields = append((*fields)[:1], (*fields)[2:]...)
2021-02-03 23:58:03 +00:00
return &horodate, nil
2021-02-03 23:51:04 +00:00
}
2021-02-03 23:58:03 +00:00
return nil, nil
2021-02-03 23:51:04 +00:00
}
2021-02-04 02:31:27 +00:00
func treatLine(line []byte) (key string, horodate *time.Time, data []byte, err error) {
2021-02-03 23:58:03 +00:00
line = bytes.TrimSpace(line)
2021-02-03 23:50:39 +00:00
2021-02-03 23:58:03 +00:00
if len(line) <= 1 {
return
}
2021-02-03 23:50:39 +00:00
2021-02-03 23:58:03 +00:00
if computeChecksum(line[:len(line)-1]) != line[len(line)-1] {
log.Printf("BAD checksum on %s: calculated: %c\n", line, computeChecksum(line[:len(line)-1]))
return
}
2021-02-03 23:50:39 +00:00
2021-02-03 23:58:03 +00:00
fields := bytes.Fields(line)
2021-02-03 23:50:39 +00:00
2021-02-03 23:58:03 +00:00
key = string(fields[0])
2021-02-03 23:51:04 +00:00
2021-02-03 23:58:03 +00:00
horodate, err = getHorodate(&fields)
if err != nil {
return
2021-02-03 23:50:39 +00:00
}
2021-02-03 23:58:03 +00:00
2021-02-04 02:31:27 +00:00
data = bytes.Join(fields[1:len(fields)-1], []byte{' '})
2021-02-03 23:58:03 +00:00
return
2021-02-03 22:26:03 +00:00
}
func main() {
2021-02-04 02:31:27 +00:00
var influxScheme = flag.String("influx-scheme", "http", "Scheme to use to contact InfluxDB")
var influxHost = flag.String("influx-host", "localhost", "Host where lives InfluxDB")
var influxPort = flag.Uint("influx-port", 8086, "Port where InfluxDB is accessible")
2022-07-29 10:22:37 +00:00
var influxOrganization = flag.String("influx-organization", "", "Organization to use")
var influxBucket = flag.String("influx-bucket", "teleinfo", "Bucket where store data")
var influxToken = flag.String("influx-token", "", "Token use to perform authentication")
2021-02-04 02:31:27 +00:00
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
2021-02-03 22:26:03 +00:00
flag.Parse()
if len(flag.Args()) < 1 {
log.Println("missing required argument: serial device (eg. /dev/ttyUSB0)")
return
}
config := &serial.Config{
Name: flag.Args()[0],
Baud: 9600,
Size: 7,
Parity: serial.ParityNone,
StopBits: 1,
}
s, err := serial.OpenPort(config)
if err != nil {
log.Fatal(err)
}
2022-07-29 10:22:37 +00:00
client := influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", *influxScheme, *influxHost, *influxPort), *influxToken)
2021-02-04 02:31:27 +00:00
2021-02-03 23:58:03 +00:00
frames := make(chan []byte)
go readSerial(s, frames)
2022-07-29 10:22:37 +00:00
go treatFrames(frames, client, *influxOrganization, *influxBucket)
2021-02-03 22:26:03 +00:00
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
<-interrupt
2021-02-04 02:31:27 +00:00
client.Close()
2021-02-03 22:26:03 +00:00
}