linky2influx/main.go

257 lines
4.9 KiB
Go

package main
import (
"bytes"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"
"time"
"unicode"
"github.com/tarm/serial"
)
func readSerial(s *serial.Port, c chan []byte) {
var unread bytes.Buffer
buf := make([]byte, 128)
for {
n, err := s.Read(buf)
if err != nil {
log.Fatal(err)
}
unread.Write(buf[:n])
for {
frame, err := unread.ReadBytes(2)
if err == io.EOF {
if _, err = unread.Write(frame); err != nil {
log.Println(err)
}
break
} else if err != nil {
log.Println(err)
break
}
c <- bytes.TrimRightFunc(frame, unicode.IsControl)
}
}
}
type Point struct {
Data []byte
Horodate *time.Time
}
func treatFrames(frames chan []byte, writer TICWriter, legacyMode bool) {
first := true
fframe:
for {
frame := <-frames
// Skip the first frame because it's not complete
if first {
first = false
continue
}
points := map[string]Point{}
var defaultHorodate time.Time
for _, line := range bytes.Split(frame, []byte("\r\n")) {
key, horodate, data, err := treatLine(line, legacyMode)
if err != nil {
log.Println(err)
continue
}
// Replace legacy keys
if legacyMode {
if nkey, ok := Legacy2Std[key]; ok {
key = nkey
}
}
// Skip ADCO, this is the Linky address, confidential and unrelevant
if key == "ADCO" || key == "ADSC" || key == "PRM" {
continue
}
if key == "DATE" {
if horodate == nil {
continue fframe
}
defaultHorodate = *horodate
continue
}
points[key] = Point{
Data: data,
Horodate: horodate,
}
}
err := writer.AddPoints(points, defaultHorodate)
if err != nil {
log.Println("Unable to write points:", err)
}
}
}
func computeChecksum(area []byte) (checksum byte) {
for _, c := range area {
checksum += c
}
checksum = (checksum & 0x3F) + 0x20
return
}
func getHorodate(fields *[][]byte) (*time.Time, error) {
if (len(*fields) == 4 || string((*fields)[0]) == "DATE") && len((*fields)[1]) == 13 {
horodate, err := time.Parse("060102150405", string((*fields)[1][1:]))
if err != nil {
return nil, err
}
// Handle "saison"
if (*fields)[1][0] == 'E' || (*fields)[1][0] == 'e' {
horodate = horodate.Add(-2 * time.Hour)
} else {
horodate = horodate.Add(-1 * time.Hour)
}
// Mark field as treated
*fields = append((*fields)[:1], (*fields)[2:]...)
return &horodate, nil
}
return nil, nil
}
func treatLine(line []byte, legacyMode bool) (key string, horodate *time.Time, data []byte, err error) {
line = bytes.TrimSpace(line)
if len(line) <= 1 {
return
}
if computeChecksum(line[:len(line)-1]) != line[len(line)-1] {
// Try checksum mode 1
if !legacyMode || computeChecksum(line[:len(line)-2]) != line[len(line)-1] {
log.Printf("BAD checksum on %s: calculated: %c\n", line, computeChecksum(line[:len(line)-1]))
return
}
}
fields := bytes.Fields(line)
if len(fields) < 2 {
err = fmt.Errorf("Invalid number of fields in line")
return
}
key = string(fields[0])
horodate, err = getHorodate(&fields)
if err != nil {
return
}
data = bytes.Join(fields[1:len(fields)-1], []byte{' '})
return
}
type TICWriter interface {
Init() error
Close()
AddPoints(m map[string]Point, defaultHorodate time.Time) (err error)
}
func main() {
var legacyMode = flag.Bool("legacy-mode", false, "Assume teleinformation in legacy mode")
var pushFrequency = flag.Bool("push-frequency", false, "Also fetch data about the grid frequency")
flag.Parse()
parseEnvironmentVariables()
if len(flag.Args()) < 1 {
log.Println("missing required argument: serial device (eg. /dev/ttyUSB0)")
return
}
serialSpeed := 9600
parity := serial.ParityNone
if *legacyMode {
serialSpeed = 1200
parity = serial.ParityOdd
}
config := &serial.Config{
Name: flag.Args()[0],
Baud: serialSpeed,
Size: 7,
Parity: parity,
StopBits: 1,
}
s, err := serial.OpenPort(config)
if err != nil {
log.Fatal(err)
}
var writer TICWriter
writer = &InfluxWriter{}
err = writer.Init()
if err != nil {
log.Fatal("Unable to initialize the writer:", err)
}
log.Printf("linky2influxdb ready, listen to %s at %d baud", config.Name, config.Baud)
frames := make(chan []byte)
go readSerial(s, frames)
go treatFrames(frames, writer, *legacyMode)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
if *pushFrequency {
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
frequencyloop:
for {
select {
case <-ticker.C:
freq, err := FetchFrequency()
if err != nil {
log.Println("An error occurs during FetchFrequency: ", err.Error())
continue frequencyloop
}
err = WriteFrequency(writer, freq)
if err != nil {
log.Println("An error occurs during WriteFrequency: ", err.Error())
}
case <-interrupt:
break frequencyloop
}
}
} else {
<-interrupt
}
writer.Close()
}