Refactor code to extract Writer

This commit is contained in:
nemunaire 2022-08-20 12:02:14 +02:00
parent cfc9a8d0c7
commit ba5c097cf0
2 changed files with 190 additions and 138 deletions

159
influxWriter.go Normal file
View File

@ -0,0 +1,159 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"strconv"
"time"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
var (
influxScheme = "http"
influxHost = "localhost"
influxPort uint = 8086
influxOrganization = ""
influxBucket = "teleinfo"
influxToken = ""
myZone = ""
)
func init() {
flag.StringVar(&influxScheme, "influx-scheme", influxScheme, "Scheme to use to contact InfluxDB")
flag.StringVar(&influxHost, "influx-host", influxHost, "Host where lives InfluxDB")
flag.UintVar(&influxPort, "influx-port", influxPort, "Port where InfluxDB is accessible")
flag.StringVar(&influxOrganization, "influx-organization", influxOrganization, "Organization to use")
flag.StringVar(&influxBucket, "influx-bucket", influxBucket, "Bucket where store data")
flag.StringVar(&influxToken, "influx-token", influxToken, "Token use to perform authentication")
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
}
type InfluxWriter struct {
client influxdb2.Client
writeAPI api.WriteAPIBlocking
myZone string
}
func (w *InfluxWriter) Init() error {
w.client = influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", influxScheme, influxHost, influxPort), influxToken)
w.writeAPI = w.client.WriteAPIBlocking(influxOrganization, influxBucket)
w.myZone = myZone
return nil
}
func (w *InfluxWriter) Close() {
w.client.Close()
}
func (w *InfluxWriter) defaultTags() map[string]string {
ret := map[string]string{}
if w.myZone != "" {
ret["zone"] = w.myZone
}
if host, err := os.Hostname(); err == nil {
ret["host"] = host
}
return ret
}
func fillUnit(tags *map[string]string, key string) {
if v, ok := MeasurementUnits[key]; ok {
(*tags)["unit"] = v
}
}
func getValue(key string, data Point) interface{} {
if _, ok := MeasurementUnits[key]; ok {
ret, _ := strconv.Atoi(string(data.Data))
return ret
} else {
return string(data.Data)
}
}
func (w *InfluxWriter) AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) {
points := w.genMeasurements(m, defaultHorodate)
err = w.writeAPI.WritePoint(context.Background(), points...)
return
}
func (w *InfluxWriter) genMeasurements(m map[string]Point, defaultHorodate time.Time) (points []*write.Point) {
measurmentdone := []string{}
// Treat MeasurementGroups
for kgrp, grp := range MeasurementGroups {
fields := map[string]interface{}{}
for _, k := range grp {
measurmentdone = append(measurmentdone, k)
if v, ok := m[k]; ok {
fields[k] = getValue(k, v)
}
}
if len(fields) == 0 {
continue
}
tags := w.defaultTags()
fillUnit(&tags, grp[0])
points = append(points, influxdb2.NewPoint(kgrp,
tags,
fields,
defaultHorodate,
))
}
// Treat int
munits:
for k := range MeasurementUnits {
if _, ok := m[k]; !ok {
continue
}
for _, mg := range measurmentdone {
if mg == k {
continue munits
}
}
tags := w.defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
defaultHorodate,
))
}
// Treat string
for _, k := range MeasurementStrings {
if _, ok := m[k]; !ok {
continue
}
tags := w.defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
defaultHorodate,
))
}
return
}

169
main.go
View File

@ -2,25 +2,18 @@ package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"unicode"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/tarm/serial"
)
var myZone = ""
func readSerial(s *serial.Port, c chan []byte) {
var unread bytes.Buffer
buf := make([]byte, 128)
@ -34,9 +27,9 @@ func readSerial(s *serial.Port, c chan []byte) {
unread.Write(buf[:n])
for {
line, err := unread.ReadBytes(2)
frame, err := unread.ReadBytes(2)
if err == io.EOF {
if _, err = unread.Write(line); err != nil {
if _, err = unread.Write(frame); err != nil {
log.Println(err)
}
break
@ -45,114 +38,18 @@ func readSerial(s *serial.Port, c chan []byte) {
break
}
c <- bytes.TrimRightFunc(line, unicode.IsControl)
c <- bytes.TrimRightFunc(frame, unicode.IsControl)
}
}
}
func defaultTags() map[string]string {
ret := map[string]string{}
if myZone != "" {
ret["zone"] = myZone
type Point struct {
Data []byte
Horodate *time.Time
}
if host, err := os.Hostname(); err == nil {
ret["host"] = host
}
return ret
}
func fillUnit(tags *map[string]string, key string) {
if v, ok := MeasurementUnits[key]; ok {
(*tags)["unit"] = v
}
}
func getValue(key string, data []byte) interface{} {
if _, ok := MeasurementUnits[key]; ok {
ret, _ := strconv.Atoi(string(data))
return ret
} else {
return string(data)
}
}
func genMeasurements(m map[string][]byte, horodate time.Time) (points []*write.Point) {
measurmentdone := []string{}
// Treat MeasurementGroups
for kgrp, grp := range MeasurementGroups {
fields := map[string]interface{}{}
for _, k := range grp {
measurmentdone = append(measurmentdone, k)
if v, ok := m[k]; ok {
fields[k] = getValue(k, v)
}
}
if len(fields) == 0 {
continue
}
tags := defaultTags()
fillUnit(&tags, grp[0])
points = append(points, influxdb2.NewPoint(kgrp,
tags,
fields,
horodate,
))
}
// Treat int
munits:
for k := range MeasurementUnits {
if _, ok := m[k]; !ok {
continue
}
for _, mg := range measurmentdone {
if mg == k {
continue munits
}
}
tags := defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
horodate,
))
}
// Treat string
for _, k := range MeasurementStrings {
if _, ok := m[k]; !ok {
continue
}
tags := defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
horodate,
))
}
return
}
func treatFrames(frames chan []byte, client influxdb2.Client, org string, bucket string) {
func treatFrames(frames chan []byte, writer TICWriter) {
first := true
writeAPI := client.WriteAPIBlocking(org, bucket)
fframe:
for {
frame := <-frames
@ -163,12 +60,10 @@ fframe:
continue
}
points := []*write.Point{}
points := map[string]Point{}
var defaultHorodate time.Time
m := map[string][]byte{}
for _, line := range bytes.Split(frame, []byte("\r\n")) {
key, horodate, data, err := treatLine(line)
@ -190,23 +85,13 @@ fframe:
continue
}
if horodate == nil {
m[key] = data
} else {
tags := defaultTags()
fillUnit(&tags, key)
points = append(points, influxdb2.NewPoint(key,
tags,
map[string]interface{}{"value": getValue(key, data)},
*horodate,
))
points[key] = Point{
Data: data,
Horodate: horodate,
}
}
points = append(points, genMeasurements(m, defaultHorodate)...)
err := writeAPI.WritePoint(context.Background(), points...)
err := writer.AddPoints(points, defaultHorodate)
if err != nil {
log.Println("Unable to write points:", err)
}
@ -272,15 +157,14 @@ func treatLine(line []byte) (key string, horodate *time.Time, data []byte, err e
return
}
type TICWriter interface {
Init() error
Close()
AddPoints(m map[string]Point, defaultHorodate time.Time) (err error)
}
func main() {
var legacyMode = flag.Bool("legacy-mode", false, "Assume teleinformation in legacy mode")
var influxScheme = flag.String("influx-scheme", "http", "Scheme to use to contact InfluxDB")
var influxHost = flag.String("influx-host", "localhost", "Host where lives InfluxDB")
var influxPort = flag.Uint("influx-port", 8086, "Port where InfluxDB is accessible")
var influxOrganization = flag.String("influx-organization", "", "Organization to use")
var influxBucket = flag.String("influx-bucket", "teleinfo", "Bucket where store data")
var influxToken = flag.String("influx-token", "", "Token use to perform authentication")
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
flag.Parse()
if len(flag.Args()) < 1 {
@ -289,15 +173,17 @@ func main() {
}
serialSpeed := 9600
parity := serial.ParityNone
if *legacyMode {
serialSpeed = 1200
parity = serial.ParityOdd
}
config := &serial.Config{
Name: flag.Args()[0],
Baud: serialSpeed,
Size: 7,
Parity: serial.ParityNone,
Parity: parity,
StopBits: 1,
}
@ -306,15 +192,22 @@ func main() {
log.Fatal(err)
}
client := influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", *influxScheme, *influxHost, *influxPort), *influxToken)
var writer TICWriter
writer = &InfluxWriter{}
err = writer.Init()
if err != nil {
log.Fatal("Unable to initialize the writer:", err)
}
log.Printf("linky2influxdb ready, listen to %s at %d baud", config.Name, config.Baud)
frames := make(chan []byte)
go readSerial(s, frames)
go treatFrames(frames, client, *influxOrganization, *influxBucket)
go treatFrames(frames, writer)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
<-interrupt
client.Close()
writer.Close()
}