package main import ( "bytes" "context" "flag" "fmt" "io" "log" "os" "os/signal" "strconv" "syscall" "time" "unicode" "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/tarm/serial" ) var myZone = "" func readSerial(s *serial.Port, c chan []byte) { var unread bytes.Buffer buf := make([]byte, 128) for { n, err := s.Read(buf) if err != nil { log.Fatal(err) } unread.Write(buf[:n]) for { line, err := unread.ReadBytes(2) if err == io.EOF { if _, err = unread.Write(line); err != nil { log.Println(err) } break } else if err != nil { log.Println(err) break } c <- bytes.TrimRightFunc(line, unicode.IsControl) } } } func defaultTags() map[string]string { ret := map[string]string{} if myZone != "" { ret["zone"] = myZone } if host, err := os.Hostname(); err == nil { ret["host"] = host } return ret } func fillUnit(tags *map[string]string, key string) { if v, ok := MeasurementUnits[key]; ok { (*tags)["unit"] = v } } func getValue(key string, data []byte) interface{} { if _, ok := MeasurementUnits[key]; ok { ret, _ := strconv.Atoi(string(data)) return ret } else { return string(data) } } func genMeasurements(m map[string][]byte, horodate time.Time) (points []*write.Point) { measurmentdone := []string{} // Treat MeasurementGroups for kgrp, grp := range MeasurementGroups { fields := map[string]interface{}{} for _, k := range grp { measurmentdone = append(measurmentdone, k) if v, ok := m[k]; ok { fields[k] = getValue(k, v) } } if len(fields) == 0 { continue } tags := defaultTags() fillUnit(&tags, grp[0]) points = append(points, influxdb2.NewPoint(kgrp, tags, fields, horodate, )) } // Treat int munits: for k := range MeasurementUnits { if _, ok := m[k]; !ok { continue } for _, mg := range measurmentdone { if mg == k { continue munits } } tags := defaultTags() fillUnit(&tags, k) points = append(points, influxdb2.NewPoint(k, tags, map[string]interface{}{"value": getValue(k, m[k])}, horodate, )) } // Treat string for _, k := range MeasurementStrings { if _, ok := m[k]; !ok { continue } tags := defaultTags() fillUnit(&tags, k) points = append(points, influxdb2.NewPoint(k, tags, map[string]interface{}{"value": getValue(k, m[k])}, horodate, )) } return } func treatFrames(frames chan []byte, client influxdb2.Client) { first := true writeAPI := client.WriteAPIBlocking("", "teleinfo/autogen") fframe: for { frame := <-frames // Skip the first frame because it's not complete if first { first = false continue } points := []*write.Point{} var defaultHorodate time.Time m := map[string][]byte{} for _, line := range bytes.Split(frame, []byte("\r\n")) { key, horodate, data, err := treatLine(line) if err != nil { log.Println(err) continue } // Skip ADCO, this is the Linky address, confidential and unrelevant if key == "ADCO" { continue } if key == "DATE" { if horodate == nil { continue fframe } defaultHorodate = *horodate continue } if horodate == nil { m[key] = data } else { tags := defaultTags() fillUnit(&tags, key) points = append(points, influxdb2.NewPoint(key, tags, map[string]interface{}{"value": getValue(key, data)}, *horodate, )) } } points = append(points, genMeasurements(m, defaultHorodate)...) err := writeAPI.WritePoint(context.Background(), points...) if err != nil { log.Println("Unable to write points:", err) } } } func computeChecksum(area []byte) (checksum byte) { for _, c := range area { checksum += c } checksum = (checksum & 0x3F) + 0x20 return } func getHorodate(fields *[][]byte) (*time.Time, error) { if (len(*fields) == 4 || string((*fields)[0]) == "DATE") && len((*fields)[1]) == 13 { horodate, err := time.Parse("060102150405", string((*fields)[1][1:])) if err != nil { return nil, err } // Handle "saison" if (*fields)[1][0] == 'E' || (*fields)[1][0] == 'e' { horodate = horodate.Add(-2 * time.Hour) } else { horodate = horodate.Add(-1 * time.Hour) } // Mark field as treated *fields = append((*fields)[:1], (*fields)[2:]...) return &horodate, nil } return nil, nil } func treatLine(line []byte) (key string, horodate *time.Time, data []byte, err error) { line = bytes.TrimSpace(line) if len(line) <= 1 { return } if computeChecksum(line[:len(line)-1]) != line[len(line)-1] { log.Printf("BAD checksum on %s: calculated: %c\n", line, computeChecksum(line[:len(line)-1])) return } fields := bytes.Fields(line) key = string(fields[0]) horodate, err = getHorodate(&fields) if err != nil { return } data = bytes.Join(fields[1:len(fields)-1], []byte{' '}) return } func main() { var influxScheme = flag.String("influx-scheme", "http", "Scheme to use to contact InfluxDB") var influxHost = flag.String("influx-host", "localhost", "Host where lives InfluxDB") var influxPort = flag.Uint("influx-port", 8086, "Port where InfluxDB is accessible") var influxUser = flag.String("influx-username", "influx", "Username to use to perform authentication") var influxPass = flag.String("influx-password", "", "Password to use to perform authentication") flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points") flag.Parse() if len(flag.Args()) < 1 { log.Println("missing required argument: serial device (eg. /dev/ttyUSB0)") return } config := &serial.Config{ Name: flag.Args()[0], Baud: 9600, Size: 7, Parity: serial.ParityNone, StopBits: 1, } s, err := serial.OpenPort(config) if err != nil { log.Fatal(err) } client := influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", *influxScheme, *influxHost, *influxPort), fmt.Sprintf("%s:%s", *influxUser, *influxPass)) frames := make(chan []byte) go readSerial(s, frames) go treatFrames(frames, client) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) <-interrupt client.Close() }