linky2influx/influxWriter.go

160 lines
3.5 KiB
Go

package main
import (
"context"
"flag"
"fmt"
"os"
"strconv"
"time"
"github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
)
var (
influxScheme = "http"
influxHost = "localhost"
influxPort uint = 8086
influxOrganization = ""
influxBucket = "teleinfo"
influxToken = ""
myZone = ""
)
func init() {
flag.StringVar(&influxScheme, "influx-scheme", influxScheme, "Scheme to use to contact InfluxDB")
flag.StringVar(&influxHost, "influx-host", influxHost, "Host where lives InfluxDB")
flag.UintVar(&influxPort, "influx-port", influxPort, "Port where InfluxDB is accessible")
flag.StringVar(&influxOrganization, "influx-organization", influxOrganization, "Organization to use")
flag.StringVar(&influxBucket, "influx-bucket", influxBucket, "Bucket where store data")
flag.StringVar(&influxToken, "influx-token", influxToken, "Token use to perform authentication")
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
}
type InfluxWriter struct {
client influxdb2.Client
writeAPI api.WriteAPIBlocking
myZone string
}
func (w *InfluxWriter) Init() error {
w.client = influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", influxScheme, influxHost, influxPort), influxToken)
w.writeAPI = w.client.WriteAPIBlocking(influxOrganization, influxBucket)
w.myZone = myZone
return nil
}
func (w *InfluxWriter) Close() {
w.client.Close()
}
func (w *InfluxWriter) defaultTags() map[string]string {
ret := map[string]string{}
if w.myZone != "" {
ret["zone"] = w.myZone
}
if host, err := os.Hostname(); err == nil {
ret["host"] = host
}
return ret
}
func fillUnit(tags *map[string]string, key string) {
if v, ok := MeasurementUnits[key]; ok {
(*tags)["unit"] = v
}
}
func getValue(key string, data Point) interface{} {
if _, ok := MeasurementUnits[key]; ok {
ret, _ := strconv.Atoi(string(data.Data))
return ret
} else {
return string(data.Data)
}
}
func (w *InfluxWriter) AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) {
points := w.genMeasurements(m, defaultHorodate)
err = w.writeAPI.WritePoint(context.Background(), points...)
return
}
func (w *InfluxWriter) genMeasurements(m map[string]Point, defaultHorodate time.Time) (points []*write.Point) {
measurmentdone := []string{}
// Treat MeasurementGroups
for kgrp, grp := range MeasurementGroups {
fields := map[string]interface{}{}
for _, k := range grp {
measurmentdone = append(measurmentdone, k)
if v, ok := m[k]; ok {
fields[k] = getValue(k, v)
}
}
if len(fields) == 0 {
continue
}
tags := w.defaultTags()
fillUnit(&tags, grp[0])
points = append(points, influxdb2.NewPoint(kgrp,
tags,
fields,
defaultHorodate,
))
}
// Treat int
munits:
for k := range MeasurementUnits {
if _, ok := m[k]; !ok {
continue
}
for _, mg := range measurmentdone {
if mg == k {
continue munits
}
}
tags := w.defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
defaultHorodate,
))
}
// Treat string
for _, k := range MeasurementStrings {
if _, ok := m[k]; !ok {
continue
}
tags := w.defaultTags()
fillUnit(&tags, k)
points = append(points, influxdb2.NewPoint(k,
tags,
map[string]interface{}{"value": getValue(k, m[k])},
defaultHorodate,
))
}
return
}