Pierre-Olivier Mercier
610275ca1d
Some checks failed
continuous-integration/drone/push Build is failing
314 lines
6.1 KiB
Go
314 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"syscall"
|
|
"time"
|
|
"unicode"
|
|
|
|
"github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
|
"github.com/tarm/serial"
|
|
)
|
|
|
|
var myZone = ""
|
|
|
|
func readSerial(s *serial.Port, c chan []byte) {
|
|
var unread bytes.Buffer
|
|
buf := make([]byte, 128)
|
|
|
|
for {
|
|
n, err := s.Read(buf)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
unread.Write(buf[:n])
|
|
|
|
for {
|
|
line, err := unread.ReadBytes(2)
|
|
if err == io.EOF {
|
|
if _, err = unread.Write(line); err != nil {
|
|
log.Println(err)
|
|
}
|
|
break
|
|
} else if err != nil {
|
|
log.Println(err)
|
|
break
|
|
}
|
|
|
|
c <- bytes.TrimRightFunc(line, unicode.IsControl)
|
|
}
|
|
}
|
|
}
|
|
|
|
func defaultTags() map[string]string {
|
|
ret := map[string]string{}
|
|
|
|
if myZone != "" {
|
|
ret["zone"] = myZone
|
|
}
|
|
|
|
if host, err := os.Hostname(); err == nil {
|
|
ret["host"] = host
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func fillUnit(tags *map[string]string, key string) {
|
|
if v, ok := MeasurementUnits[key]; ok {
|
|
(*tags)["unit"] = v
|
|
}
|
|
}
|
|
|
|
func getValue(key string, data []byte) interface{} {
|
|
if _, ok := MeasurementUnits[key]; ok {
|
|
ret, _ := strconv.Atoi(string(data))
|
|
return ret
|
|
} else {
|
|
return string(data)
|
|
}
|
|
}
|
|
|
|
func genMeasurements(m map[string][]byte, horodate time.Time) (points []*write.Point) {
|
|
measurmentdone := []string{}
|
|
|
|
// Treat MeasurementGroups
|
|
for kgrp, grp := range MeasurementGroups {
|
|
fields := map[string]interface{}{}
|
|
|
|
for _, k := range grp {
|
|
measurmentdone = append(measurmentdone, k)
|
|
|
|
if v, ok := m[k]; ok {
|
|
fields[k] = getValue(k, v)
|
|
}
|
|
}
|
|
|
|
if len(fields) == 0 {
|
|
continue
|
|
}
|
|
|
|
tags := defaultTags()
|
|
fillUnit(&tags, grp[0])
|
|
|
|
points = append(points, influxdb2.NewPoint(kgrp,
|
|
tags,
|
|
fields,
|
|
horodate,
|
|
))
|
|
}
|
|
|
|
// Treat int
|
|
munits:
|
|
for k := range MeasurementUnits {
|
|
if _, ok := m[k]; !ok {
|
|
continue
|
|
}
|
|
|
|
for _, mg := range measurmentdone {
|
|
if mg == k {
|
|
continue munits
|
|
}
|
|
}
|
|
|
|
tags := defaultTags()
|
|
fillUnit(&tags, k)
|
|
|
|
points = append(points, influxdb2.NewPoint(k,
|
|
tags,
|
|
map[string]interface{}{"value": getValue(k, m[k])},
|
|
horodate,
|
|
))
|
|
}
|
|
|
|
// Treat string
|
|
for _, k := range MeasurementStrings {
|
|
if _, ok := m[k]; !ok {
|
|
continue
|
|
}
|
|
|
|
tags := defaultTags()
|
|
fillUnit(&tags, k)
|
|
|
|
points = append(points, influxdb2.NewPoint(k,
|
|
tags,
|
|
map[string]interface{}{"value": getValue(k, m[k])},
|
|
horodate,
|
|
))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func treatFrames(frames chan []byte, client influxdb2.Client) {
|
|
first := true
|
|
writeAPI := client.WriteAPIBlocking("", "teleinfo/autogen")
|
|
fframe:
|
|
for {
|
|
frame := <-frames
|
|
|
|
// Skip the first frame because it's not complete
|
|
if first {
|
|
first = false
|
|
continue
|
|
}
|
|
|
|
points := []*write.Point{}
|
|
|
|
var defaultHorodate time.Time
|
|
|
|
m := map[string][]byte{}
|
|
|
|
for _, line := range bytes.Split(frame, []byte("\r\n")) {
|
|
key, horodate, data, err := treatLine(line)
|
|
|
|
if err != nil {
|
|
log.Println(err)
|
|
continue
|
|
}
|
|
|
|
// Skip ADCO, this is the Linky address, confidential and unrelevant
|
|
if key == "ADCO" {
|
|
continue
|
|
}
|
|
|
|
if key == "DATE" {
|
|
if horodate == nil {
|
|
continue fframe
|
|
}
|
|
defaultHorodate = *horodate
|
|
continue
|
|
}
|
|
|
|
if horodate == nil {
|
|
m[key] = data
|
|
} else {
|
|
tags := defaultTags()
|
|
fillUnit(&tags, key)
|
|
|
|
points = append(points, influxdb2.NewPoint(key,
|
|
tags,
|
|
map[string]interface{}{"value": getValue(key, data)},
|
|
*horodate,
|
|
))
|
|
}
|
|
}
|
|
|
|
points = append(points, genMeasurements(m, defaultHorodate)...)
|
|
|
|
err := writeAPI.WritePoint(context.Background(), points...)
|
|
if err != nil {
|
|
log.Println("Unable to write points:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func computeChecksum(area []byte) (checksum byte) {
|
|
for _, c := range area {
|
|
checksum += c
|
|
}
|
|
|
|
checksum = (checksum & 0x3F) + 0x20
|
|
|
|
return
|
|
}
|
|
|
|
func getHorodate(fields *[][]byte) (*time.Time, error) {
|
|
if (len(*fields) == 4 || string((*fields)[0]) == "DATE") && len((*fields)[1]) == 13 {
|
|
horodate, err := time.Parse("060102150405", string((*fields)[1][1:]))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Handle "saison"
|
|
if (*fields)[1][0] == 'E' || (*fields)[1][0] == 'e' {
|
|
horodate = horodate.Add(-2 * time.Hour)
|
|
} else {
|
|
horodate = horodate.Add(-1 * time.Hour)
|
|
}
|
|
|
|
// Mark field as treated
|
|
*fields = append((*fields)[:1], (*fields)[2:]...)
|
|
|
|
return &horodate, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func treatLine(line []byte) (key string, horodate *time.Time, data []byte, err error) {
|
|
line = bytes.TrimSpace(line)
|
|
|
|
if len(line) <= 1 {
|
|
return
|
|
}
|
|
|
|
if computeChecksum(line[:len(line)-1]) != line[len(line)-1] {
|
|
log.Printf("BAD checksum on %s: calculated: %c\n", line, computeChecksum(line[:len(line)-1]))
|
|
return
|
|
}
|
|
|
|
fields := bytes.Fields(line)
|
|
|
|
key = string(fields[0])
|
|
|
|
horodate, err = getHorodate(&fields)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
data = bytes.Join(fields[1:len(fields)-1], []byte{' '})
|
|
|
|
return
|
|
}
|
|
|
|
func main() {
|
|
var influxScheme = flag.String("influx-scheme", "http", "Scheme to use to contact InfluxDB")
|
|
var influxHost = flag.String("influx-host", "localhost", "Host where lives InfluxDB")
|
|
var influxPort = flag.Uint("influx-port", 8086, "Port where InfluxDB is accessible")
|
|
var influxUser = flag.String("influx-username", "influx", "Username to use to perform authentication")
|
|
var influxPass = flag.String("influx-password", "", "Password to use to perform authentication")
|
|
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
|
|
flag.Parse()
|
|
|
|
if len(flag.Args()) < 1 {
|
|
log.Println("missing required argument: serial device (eg. /dev/ttyUSB0)")
|
|
return
|
|
}
|
|
|
|
config := &serial.Config{
|
|
Name: flag.Args()[0],
|
|
Baud: 9600,
|
|
Size: 7,
|
|
Parity: serial.ParityNone,
|
|
StopBits: 1,
|
|
}
|
|
|
|
s, err := serial.OpenPort(config)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
client := influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", *influxScheme, *influxHost, *influxPort), fmt.Sprintf("%s:%s", *influxUser, *influxPass))
|
|
|
|
frames := make(chan []byte)
|
|
go readSerial(s, frames)
|
|
go treatFrames(frames, client)
|
|
|
|
interrupt := make(chan os.Signal, 1)
|
|
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
|
|
<-interrupt
|
|
|
|
client.Close()
|
|
}
|