diff --git a/influxWriter.go b/influxWriter.go new file mode 100644 index 0000000..e572baa --- /dev/null +++ b/influxWriter.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "strconv" + "time" + + "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +var ( + influxScheme = "http" + influxHost = "localhost" + influxPort uint = 8086 + influxOrganization = "" + influxBucket = "teleinfo" + influxToken = "" + myZone = "" +) + +func init() { + flag.StringVar(&influxScheme, "influx-scheme", influxScheme, "Scheme to use to contact InfluxDB") + flag.StringVar(&influxHost, "influx-host", influxHost, "Host where lives InfluxDB") + flag.UintVar(&influxPort, "influx-port", influxPort, "Port where InfluxDB is accessible") + flag.StringVar(&influxOrganization, "influx-organization", influxOrganization, "Organization to use") + flag.StringVar(&influxBucket, "influx-bucket", influxBucket, "Bucket where store data") + flag.StringVar(&influxToken, "influx-token", influxToken, "Token use to perform authentication") + flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points") +} + +type InfluxWriter struct { + client influxdb2.Client + writeAPI api.WriteAPIBlocking + myZone string +} + +func (w *InfluxWriter) Init() error { + w.client = influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", influxScheme, influxHost, influxPort), influxToken) + w.writeAPI = w.client.WriteAPIBlocking(influxOrganization, influxBucket) + w.myZone = myZone + + return nil +} + +func (w *InfluxWriter) Close() { + w.client.Close() +} + +func (w *InfluxWriter) defaultTags() map[string]string { + ret := map[string]string{} + + if w.myZone != "" { + ret["zone"] = w.myZone + } + + if host, err := os.Hostname(); err == nil { + ret["host"] = host + } + + return ret +} + +func fillUnit(tags *map[string]string, key string) { + if v, ok := MeasurementUnits[key]; ok { + (*tags)["unit"] = v + } +} + +func getValue(key string, data Point) interface{} { + if _, ok := MeasurementUnits[key]; ok { + ret, _ := strconv.Atoi(string(data.Data)) + return ret + } else { + return string(data.Data) + } +} + +func (w *InfluxWriter) AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) { + points := w.genMeasurements(m, defaultHorodate) + + err = w.writeAPI.WritePoint(context.Background(), points...) + return +} + +func (w *InfluxWriter) genMeasurements(m map[string]Point, defaultHorodate time.Time) (points []*write.Point) { + measurmentdone := []string{} + + // Treat MeasurementGroups + for kgrp, grp := range MeasurementGroups { + fields := map[string]interface{}{} + + for _, k := range grp { + measurmentdone = append(measurmentdone, k) + + if v, ok := m[k]; ok { + fields[k] = getValue(k, v) + } + } + + if len(fields) == 0 { + continue + } + + tags := w.defaultTags() + fillUnit(&tags, grp[0]) + + points = append(points, influxdb2.NewPoint(kgrp, + tags, + fields, + defaultHorodate, + )) + } + + // Treat int +munits: + for k := range MeasurementUnits { + if _, ok := m[k]; !ok { + continue + } + + for _, mg := range measurmentdone { + if mg == k { + continue munits + } + } + + tags := w.defaultTags() + fillUnit(&tags, k) + + points = append(points, influxdb2.NewPoint(k, + tags, + map[string]interface{}{"value": getValue(k, m[k])}, + defaultHorodate, + )) + } + + // Treat string + for _, k := range MeasurementStrings { + if _, ok := m[k]; !ok { + continue + } + + tags := w.defaultTags() + fillUnit(&tags, k) + + points = append(points, influxdb2.NewPoint(k, + tags, + map[string]interface{}{"value": getValue(k, m[k])}, + defaultHorodate, + )) + } + + return +} diff --git a/main.go b/main.go index 6999575..a0953d0 100644 --- a/main.go +++ b/main.go @@ -2,25 +2,18 @@ package main import ( "bytes" - "context" "flag" - "fmt" "io" "log" "os" "os/signal" - "strconv" "syscall" "time" "unicode" - "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/tarm/serial" ) -var myZone = "" - func readSerial(s *serial.Port, c chan []byte) { var unread bytes.Buffer buf := make([]byte, 128) @@ -34,9 +27,9 @@ func readSerial(s *serial.Port, c chan []byte) { unread.Write(buf[:n]) for { - line, err := unread.ReadBytes(2) + frame, err := unread.ReadBytes(2) if err == io.EOF { - if _, err = unread.Write(line); err != nil { + if _, err = unread.Write(frame); err != nil { log.Println(err) } break @@ -45,114 +38,18 @@ func readSerial(s *serial.Port, c chan []byte) { break } - c <- bytes.TrimRightFunc(line, unicode.IsControl) + c <- bytes.TrimRightFunc(frame, unicode.IsControl) } } } -func defaultTags() map[string]string { - ret := map[string]string{} - - if myZone != "" { - ret["zone"] = myZone - } - - if host, err := os.Hostname(); err == nil { - ret["host"] = host - } - - return ret +type Point struct { + Data []byte + Horodate *time.Time } -func fillUnit(tags *map[string]string, key string) { - if v, ok := MeasurementUnits[key]; ok { - (*tags)["unit"] = v - } -} - -func getValue(key string, data []byte) interface{} { - if _, ok := MeasurementUnits[key]; ok { - ret, _ := strconv.Atoi(string(data)) - return ret - } else { - return string(data) - } -} - -func genMeasurements(m map[string][]byte, horodate time.Time) (points []*write.Point) { - measurmentdone := []string{} - - // Treat MeasurementGroups - for kgrp, grp := range MeasurementGroups { - fields := map[string]interface{}{} - - for _, k := range grp { - measurmentdone = append(measurmentdone, k) - - if v, ok := m[k]; ok { - fields[k] = getValue(k, v) - } - } - - if len(fields) == 0 { - continue - } - - tags := defaultTags() - fillUnit(&tags, grp[0]) - - points = append(points, influxdb2.NewPoint(kgrp, - tags, - fields, - horodate, - )) - } - - // Treat int -munits: - for k := range MeasurementUnits { - if _, ok := m[k]; !ok { - continue - } - - for _, mg := range measurmentdone { - if mg == k { - continue munits - } - } - - tags := defaultTags() - fillUnit(&tags, k) - - points = append(points, influxdb2.NewPoint(k, - tags, - map[string]interface{}{"value": getValue(k, m[k])}, - horodate, - )) - } - - // Treat string - for _, k := range MeasurementStrings { - if _, ok := m[k]; !ok { - continue - } - - tags := defaultTags() - fillUnit(&tags, k) - - points = append(points, influxdb2.NewPoint(k, - tags, - map[string]interface{}{"value": getValue(k, m[k])}, - horodate, - )) - } - - return -} - -func treatFrames(frames chan []byte, client influxdb2.Client, org string, bucket string) { +func treatFrames(frames chan []byte, writer TICWriter) { first := true - writeAPI := client.WriteAPIBlocking(org, bucket) fframe: for { frame := <-frames @@ -163,12 +60,10 @@ fframe: continue } - points := []*write.Point{} + points := map[string]Point{} var defaultHorodate time.Time - m := map[string][]byte{} - for _, line := range bytes.Split(frame, []byte("\r\n")) { key, horodate, data, err := treatLine(line) @@ -190,23 +85,13 @@ fframe: continue } - if horodate == nil { - m[key] = data - } else { - tags := defaultTags() - fillUnit(&tags, key) - - points = append(points, influxdb2.NewPoint(key, - tags, - map[string]interface{}{"value": getValue(key, data)}, - *horodate, - )) + points[key] = Point{ + Data: data, + Horodate: horodate, } } - points = append(points, genMeasurements(m, defaultHorodate)...) - - err := writeAPI.WritePoint(context.Background(), points...) + err := writer.AddPoints(points, defaultHorodate) if err != nil { log.Println("Unable to write points:", err) } @@ -272,15 +157,14 @@ func treatLine(line []byte) (key string, horodate *time.Time, data []byte, err e return } +type TICWriter interface { + Init() error + Close() + AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) +} + func main() { var legacyMode = flag.Bool("legacy-mode", false, "Assume teleinformation in legacy mode") - var influxScheme = flag.String("influx-scheme", "http", "Scheme to use to contact InfluxDB") - var influxHost = flag.String("influx-host", "localhost", "Host where lives InfluxDB") - var influxPort = flag.Uint("influx-port", 8086, "Port where InfluxDB is accessible") - var influxOrganization = flag.String("influx-organization", "", "Organization to use") - var influxBucket = flag.String("influx-bucket", "teleinfo", "Bucket where store data") - var influxToken = flag.String("influx-token", "", "Token use to perform authentication") - flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points") flag.Parse() if len(flag.Args()) < 1 { @@ -289,15 +173,17 @@ func main() { } serialSpeed := 9600 + parity := serial.ParityNone if *legacyMode { serialSpeed = 1200 + parity = serial.ParityOdd } config := &serial.Config{ Name: flag.Args()[0], Baud: serialSpeed, Size: 7, - Parity: serial.ParityNone, + Parity: parity, StopBits: 1, } @@ -306,15 +192,22 @@ func main() { log.Fatal(err) } - client := influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", *influxScheme, *influxHost, *influxPort), *influxToken) + var writer TICWriter + writer = &InfluxWriter{} + err = writer.Init() + if err != nil { + log.Fatal("Unable to initialize the writer:", err) + } + + log.Printf("linky2influxdb ready, listen to %s at %d baud", config.Name, config.Baud) frames := make(chan []byte) go readSerial(s, frames) - go treatFrames(frames, client, *influxOrganization, *influxBucket) + go treatFrames(frames, writer) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) <-interrupt - client.Close() + writer.Close() }