package main import ( "context" "flag" "fmt" "os" "strconv" "time" "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/write" ) var ( influxScheme = "http" influxHost = "localhost" influxPort uint = 8086 influxOrganization = "" influxBucket = "teleinfo" influxToken = "" myZone = "" ) func init() { flag.StringVar(&influxScheme, "influx-scheme", influxScheme, "Scheme to use to contact InfluxDB") flag.StringVar(&influxHost, "influx-host", influxHost, "Host where lives InfluxDB") flag.UintVar(&influxPort, "influx-port", influxPort, "Port where InfluxDB is accessible") flag.StringVar(&influxOrganization, "influx-organization", influxOrganization, "Organization to use") flag.StringVar(&influxBucket, "influx-bucket", influxBucket, "Bucket where store data") flag.StringVar(&influxToken, "influx-token", influxToken, "Token use to perform authentication") flag.StringVar(&myZone, "tag-zone", myZone, "Tag zone to add to points") } type InfluxWriter struct { client influxdb2.Client writeAPI api.WriteAPIBlocking myZone string } func (w *InfluxWriter) Init() error { w.client = influxdb2.NewClient(fmt.Sprintf("%s://%s:%d", influxScheme, influxHost, influxPort), influxToken) w.writeAPI = w.client.WriteAPIBlocking(influxOrganization, influxBucket) w.myZone = myZone return nil } func (w *InfluxWriter) Close() { w.client.Close() } func (w *InfluxWriter) defaultTags() map[string]string { ret := map[string]string{} if w.myZone != "" { ret["zone"] = w.myZone } if host, err := os.Hostname(); err == nil { ret["host"] = host } return ret } func fillUnit(tags *map[string]string, key string) { if v, ok := MeasurementUnits[key]; ok { (*tags)["unit"] = v } } func getValue(key string, data Point) interface{} { if _, ok := MeasurementUnits[key]; ok { ret, _ := strconv.Atoi(string(data.Data)) return ret } else { return string(data.Data) } } func (w *InfluxWriter) AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) { points := w.genMeasurements(m, defaultHorodate) err = w.writeAPI.WritePoint(context.Background(), points...) return } func (w *InfluxWriter) genMeasurements(m map[string]Point, defaultHorodate time.Time) (points []*write.Point) { measurmentdone := []string{} // Treat MeasurementGroups for kgrp, grp := range MeasurementGroups { fields := map[string]interface{}{} for _, k := range grp { measurmentdone = append(measurmentdone, k) if v, ok := m[k]; ok { fields[k] = getValue(k, v) } } if len(fields) == 0 { continue } tags := w.defaultTags() fillUnit(&tags, grp[0]) points = append(points, influxdb2.NewPoint(kgrp, tags, fields, defaultHorodate, )) } // Treat int munits: for k := range MeasurementUnits { if _, ok := m[k]; !ok { continue } for _, mg := range measurmentdone { if mg == k { continue munits } } tags := w.defaultTags() fillUnit(&tags, k) points = append(points, influxdb2.NewPoint(k, tags, map[string]interface{}{"value": getValue(k, m[k])}, defaultHorodate, )) } // Treat string for _, k := range MeasurementStrings { if _, ok := m[k]; !ok { continue } tags := w.defaultTags() fillUnit(&tags, k) points = append(points, influxdb2.NewPoint(k, tags, map[string]interface{}{"value": getValue(k, m[k])}, defaultHorodate, )) } return }