160 lines
3.5 KiB
Go
160 lines
3.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
|
"github.com/influxdata/influxdb-client-go/v2/api/write"
|
|
)
|
|
|
|
var (
|
|
influxScheme = "http"
|
|
influxHost = "localhost"
|
|
influxPort uint = 8086
|
|
influxOrganization = ""
|
|
influxBucket = "teleinfo"
|
|
influxToken = ""
|
|
myZone = ""
|
|
)
|
|
|
|
func init() {
|
|
flag.StringVar(&influxScheme, "influx-scheme", influxScheme, "Scheme to use to contact InfluxDB")
|
|
flag.StringVar(&influxHost, "influx-host", influxHost, "Host where lives InfluxDB")
|
|
flag.UintVar(&influxPort, "influx-port", influxPort, "Port where InfluxDB is accessible")
|
|
flag.StringVar(&influxOrganization, "influx-organization", influxOrganization, "Organization to use")
|
|
flag.StringVar(&influxBucket, "influx-bucket", influxBucket, "Bucket where store data")
|
|
flag.StringVar(&influxToken, "influx-token", influxToken, "Token use to perform authentication")
|
|
flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points")
|
|
}
|
|
|
|
type InfluxWriter struct {
|
|
client influxdb2.Client
|
|
writeAPI api.WriteAPIBlocking
|
|
myZone string
|
|
}
|
|
|
|
func (w *InfluxWriter) Init() error {
|
|
w.client = influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", influxScheme, influxHost, influxPort), influxToken)
|
|
w.writeAPI = w.client.WriteAPIBlocking(influxOrganization, influxBucket)
|
|
w.myZone = myZone
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *InfluxWriter) Close() {
|
|
w.client.Close()
|
|
}
|
|
|
|
func (w *InfluxWriter) defaultTags() map[string]string {
|
|
ret := map[string]string{}
|
|
|
|
if w.myZone != "" {
|
|
ret["zone"] = w.myZone
|
|
}
|
|
|
|
if host, err := os.Hostname(); err == nil {
|
|
ret["host"] = host
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func fillUnit(tags *map[string]string, key string) {
|
|
if v, ok := MeasurementUnits[key]; ok {
|
|
(*tags)["unit"] = v
|
|
}
|
|
}
|
|
|
|
func getValue(key string, data Point) interface{} {
|
|
if _, ok := MeasurementUnits[key]; ok {
|
|
ret, _ := strconv.Atoi(string(data.Data))
|
|
return ret
|
|
} else {
|
|
return string(data.Data)
|
|
}
|
|
}
|
|
|
|
func (w *InfluxWriter) AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) {
|
|
points := w.genMeasurements(m, defaultHorodate)
|
|
|
|
err = w.writeAPI.WritePoint(context.Background(), points...)
|
|
return
|
|
}
|
|
|
|
func (w *InfluxWriter) genMeasurements(m map[string]Point, defaultHorodate time.Time) (points []*write.Point) {
|
|
measurmentdone := []string{}
|
|
|
|
// Treat MeasurementGroups
|
|
for kgrp, grp := range MeasurementGroups {
|
|
fields := map[string]interface{}{}
|
|
|
|
for _, k := range grp {
|
|
measurmentdone = append(measurmentdone, k)
|
|
|
|
if v, ok := m[k]; ok {
|
|
fields[k] = getValue(k, v)
|
|
}
|
|
}
|
|
|
|
if len(fields) == 0 {
|
|
continue
|
|
}
|
|
|
|
tags := w.defaultTags()
|
|
fillUnit(&tags, grp[0])
|
|
|
|
points = append(points, influxdb2.NewPoint(kgrp,
|
|
tags,
|
|
fields,
|
|
defaultHorodate,
|
|
))
|
|
}
|
|
|
|
// Treat int
|
|
munits:
|
|
for k := range MeasurementUnits {
|
|
if _, ok := m[k]; !ok {
|
|
continue
|
|
}
|
|
|
|
for _, mg := range measurmentdone {
|
|
if mg == k {
|
|
continue munits
|
|
}
|
|
}
|
|
|
|
tags := w.defaultTags()
|
|
fillUnit(&tags, k)
|
|
|
|
points = append(points, influxdb2.NewPoint(k,
|
|
tags,
|
|
map[string]interface{}{"value": getValue(k, m[k])},
|
|
defaultHorodate,
|
|
))
|
|
}
|
|
|
|
// Treat string
|
|
for _, k := range MeasurementStrings {
|
|
if _, ok := m[k]; !ok {
|
|
continue
|
|
}
|
|
|
|
tags := w.defaultTags()
|
|
fillUnit(&tags, k)
|
|
|
|
points = append(points, influxdb2.NewPoint(k,
|
|
tags,
|
|
map[string]interface{}{"value": getValue(k, m[k])},
|
|
defaultHorodate,
|
|
))
|
|
}
|
|
|
|
return
|
|
}
|