package main import ( "bytes" "flag" "fmt" "io" "log" "os" "os/signal" "syscall" "time" "unicode" "github.com/tarm/serial" ) func readSerial(s *serial.Port, c chan []byte) { var unread bytes.Buffer buf := make([]byte, 128) for { n, err := s.Read(buf) if err != nil { log.Fatal(err) } unread.Write(buf[:n]) for { frame, err := unread.ReadBytes(2) if err == io.EOF { if _, err = unread.Write(frame); err != nil { log.Println(err) } break } else if err != nil { log.Println(err) break } c <- bytes.TrimRightFunc(frame, unicode.IsControl) } } } type Point struct { Data []byte Horodate *time.Time } func treatFrames(frames chan []byte, writer TICWriter, legacyMode bool) { first := true fframe: for { frame := <-frames // Skip the first frame because it's not complete if first { first = false continue } points := map[string]Point{} var defaultHorodate time.Time for _, line := range bytes.Split(frame, []byte("\r\n")) { key, horodate, data, err := treatLine(line, legacyMode) if err != nil { log.Println(err) continue } // Replace legacy keys if legacyMode { if nkey, ok := Legacy2Std[key]; ok { key = nkey } } // Skip ADCO, this is the Linky address, confidential and unrelevant if key == "ADCO" || key == "ADSC" || key == "PRM" { continue } if key == "DATE" { if horodate == nil { continue fframe } defaultHorodate = *horodate continue } points[key] = Point{ Data: data, Horodate: horodate, } } err := writer.AddPoints(points, defaultHorodate) if err != nil { log.Println("Unable to write points:", err) } } } func computeChecksum(area []byte) (checksum byte) { for _, c := range area { checksum += c } checksum = (checksum & 0x3F) + 0x20 return } func getHorodate(fields *[][]byte) (*time.Time, error) { if (len(*fields) == 4 || string((*fields)[0]) == "DATE") && len((*fields)[1]) == 13 { horodate, err := time.Parse("060102150405", string((*fields)[1][1:])) if err != nil { return nil, err } // Handle "saison" if (*fields)[1][0] == 'E' || (*fields)[1][0] == 'e' { horodate = horodate.Add(-2 * time.Hour) } else { horodate = horodate.Add(-1 * time.Hour) } // Mark field as treated *fields = append((*fields)[:1], (*fields)[2:]...) return &horodate, nil } return nil, nil } func treatLine(line []byte, legacyMode bool) (key string, horodate *time.Time, data []byte, err error) { line = bytes.TrimSpace(line) if len(line) <= 1 { return } if computeChecksum(line[:len(line)-1]) != line[len(line)-1] { // Try checksum mode 1 if !legacyMode || computeChecksum(line[:len(line)-2]) != line[len(line)-1] { log.Printf("BAD checksum on %s: calculated: %c\n", line, computeChecksum(line[:len(line)-1])) return } } fields := bytes.Fields(line) if len(fields) < 2 { err = fmt.Errorf("Invalid number of fields in line") return } key = string(fields[0]) horodate, err = getHorodate(&fields) if err != nil { return } data = bytes.Join(fields[1:len(fields)-1], []byte{' '}) return } type TICWriter interface { Init() error Close() AddPoints(m map[string]Point, defaultHorodate time.Time) (err error) } func main() { var legacyMode = flag.Bool("legacy-mode", false, "Assume teleinformation in legacy mode") var pushFrequency = flag.Bool("push-frequency", false, "Also fetch data about the grid frequency") flag.Parse() parseEnvironmentVariables() if len(flag.Args()) < 1 { log.Println("missing required argument: serial device (eg. /dev/ttyUSB0)") return } serialSpeed := 9600 parity := serial.ParityNone if *legacyMode { serialSpeed = 1200 parity = serial.ParityOdd } config := &serial.Config{ Name: flag.Args()[0], Baud: serialSpeed, Size: 7, Parity: parity, StopBits: 1, } s, err := serial.OpenPort(config) if err != nil { log.Fatal(err) } var writer TICWriter writer = &InfluxWriter{} err = writer.Init() if err != nil { log.Fatal("Unable to initialize the writer:", err) } log.Printf("linky2influxdb ready, listen to %s at %d baud", config.Name, config.Baud) frames := make(chan []byte) go readSerial(s, frames) go treatFrames(frames, writer, *legacyMode) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) if *pushFrequency { ticker := time.NewTicker(25 * time.Second) defer ticker.Stop() frequencyloop: for { select { case <-ticker.C: freq, err := FetchFrequency() if err != nil { log.Println("An error occurs during FetchFrequency: ", err.Error()) continue frequencyloop } err = WriteFrequency(writer, freq) if err != nil { log.Println("An error occurs during WriteFrequency: ", err.Error()) } case <-interrupt: break frequencyloop } } } else { <-interrupt } writer.Close() }