remote-challenge-sync-airbus: Add inotify watcher

This commit is contained in:
nemunaire 2022-06-07 17:04:07 +02:00
parent 367e686e8a
commit cc1b212cca
3 changed files with 149 additions and 71 deletions

View File

@ -2,12 +2,14 @@ package main
import ( import (
"flag" "flag"
"io/ioutil"
"log" "log"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time"
"gopkg.in/fsnotify.v1"
) )
var ( var (
@ -17,9 +19,9 @@ var (
func main() { func main() {
flag.StringVar(&TeamsDir, "teams", "./TEAMS", "Base directory where save teams JSON files") flag.StringVar(&TeamsDir, "teams", "./TEAMS", "Base directory where save teams JSON files")
//var debugINotify = flag.Bool("debuginotify", false, "Show skipped inotofy events") var debugINotify = flag.Bool("debuginotify", false, "Show skipped inotofy events")
flag.BoolVar(&skipInitialSync, "skipinitialsync", skipInitialSync, "Skip the initial synchronization") flag.BoolVar(&skipInitialSync, "skipinitialsync", skipInitialSync, "Skip the initial synchronization")
//watcher := flag.Bool("watch", false, "Enable daemon mode by watching the directory") daemon := flag.Bool("watch", false, "Enable daemon mode by watching the directory")
tspath := flag.String("timestamp-file", "./REMOTE/timestamp", "Path to the file storing the last timestamp") tspath := flag.String("timestamp-file", "./REMOTE/timestamp", "Path to the file storing the last timestamp")
exercicespath := flag.String("exercices-file", "./REMOTE/exercices-bindings.json", "Path to the file containing the ID bindings") exercicespath := flag.String("exercices-file", "./REMOTE/exercices-bindings.json", "Path to the file containing the ID bindings")
coeff := flag.Float64("global-coeff", 10.0, "Coefficient to use to multiply all scores before passing them to the other platform") coeff := flag.Float64("global-coeff", 10.0, "Coefficient to use to multiply all scores before passing them to the other platform")
@ -66,24 +68,94 @@ func main() {
} }
w := Walker{ w := Walker{
LastSync: *ts, LastSync: ts,
Exercices: exbindings, Exercices: exbindings,
Teams: teamsbindings, Teams: teamsbindings,
API: api, API: api,
Coeff: *coeff, Coeff: *coeff,
} }
if !skipInitialSync {
// Iterate over teams scores // Iterate over teams scores
err = filepath.WalkDir(TeamsDir, w.WalkScore) err = filepath.WalkDir(TeamsDir, w.WalkScore)
if err != nil { if err != nil {
log.Printf("Something goes wrong during walking") log.Printf("Something goes wrong during walking")
} }
// Update timestamp for the next time // save current timestamp for teams
w.LastSync = time.Now() err = saveTS(*tspath, w.LastSync)
err = saveTS(*tspath, &w.LastSync)
if err != nil { if err != nil {
log.Fatal("Unable to save timestamp file: ", err.Error()) log.Fatal("Unable to save timestamp file: ", err.Error())
} }
} }
if daemon != nil && *daemon {
// Watch teams.json and scores.json
log.Println("Registering directory events...")
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
if err := watchsubdir(watcher, TeamsDir); err != nil {
log.Fatal(err)
}
watchedNotify := fsnotify.Create
for {
select {
case ev := <-watcher.Events:
if d, err := os.Lstat(ev.Name); err == nil && ev.Op&fsnotify.Create == fsnotify.Create && d.Mode().IsDir() && d.Mode()&os.ModeSymlink == 0 && d.Name() != ".tmp" {
// Register new subdirectory
if err := watchsubdir(watcher, ev.Name); err != nil {
log.Println(err)
}
} else if ev.Op&watchedNotify == watchedNotify && d.Mode().IsRegular() {
if *debugINotify {
log.Println("Treating event:", ev, "for", ev.Name)
}
if filepath.Base(ev.Name) == "scores.json" {
go w.treat(ev.Name)
} else if filepath.Base(ev.Name) == "teams.json" {
teamsbindings, err := getTeams(filepath.Join(TeamsDir, "teams.json"))
if err != nil {
log.Println("Unable to open teams bindings file: ", err.Error())
return
}
w.Teams = teamsbindings
}
} else if ev.Op&fsnotify.Write == fsnotify.Write {
log.Println("FSNOTIFY WRITE SEEN. Prefer looking at them, as it appears files are not atomically moved.")
watchedNotify = fsnotify.Write
} else if *debugINotify {
log.Println("Skipped event:", ev, "for", ev.Name)
}
case err := <-watcher.Errors:
log.Println("error:", err)
}
}
}
}
func watchsubdir(watcher *fsnotify.Watcher, pathname string) error {
log.Println("Watch new directory:", pathname)
if err := watcher.Add(pathname); err != nil {
return err
}
if ds, err := ioutil.ReadDir(pathname); err != nil {
return err
} else {
for _, d := range ds {
p := path.Join(pathname, d.Name())
if d.IsDir() && d.Name() != ".tmp" && d.Mode()&os.ModeSymlink == 0 {
if err := watchsubdir(watcher, p); err != nil {
return err
}
}
}
return nil
}
}

View File

@ -1,50 +1,42 @@
package main package main
import ( import (
"fmt" "encoding/json"
"os" "os"
"time" "time"
) )
func loadTS(tspath string) (timestamp *time.Time, err error) { func loadTS(tspath string) (timestamp map[AirbusUserId]time.Time, err error) {
var fd *os.File
if _, err = os.Stat(tspath); os.IsNotExist(err) { if _, err = os.Stat(tspath); os.IsNotExist(err) {
init := time.Unix(0, 0) timestamp = map[AirbusUserId]time.Time{}
timestamp = &init
err = saveTS(tspath, timestamp) err = saveTS(tspath, timestamp)
return return
} } else if fd, err = os.Open(tspath); err != nil {
return nil, err
var fd *os.File } else {
fd, err = os.Open(tspath)
if err != nil {
return
}
defer fd.Close() defer fd.Close()
jdec := json.NewDecoder(fd)
var init int64 if err = jdec.Decode(&timestamp); err != nil {
_, err = fmt.Fscanf(fd, "%d", &init)
if err != nil {
return return
} }
tmp := time.Unix(init, 0)
timestamp = &tmp
return return
} }
}
func saveTS(tspath string, ts *time.Time) error { func saveTS(tspath string, ts map[AirbusUserId]time.Time) error {
fd, err := os.Create(tspath) if fd, err := os.Create(tspath); err != nil {
if err != nil {
return err return err
} } else {
defer fd.Close() defer fd.Close()
jenc := json.NewEncoder(fd)
_, err = fmt.Fprintf(fd, "%d", ts.Unix()) if err := jenc.Encode(ts); err != nil {
if err != nil {
return err return err
} }
return nil return nil
} }
}

View File

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -10,27 +11,28 @@ import (
) )
type Walker struct { type Walker struct {
LastSync time.Time LastSync map[AirbusUserId]time.Time
Exercices AirbusExercicesBindings Exercices AirbusExercicesBindings
Teams map[string]fic.ExportedTeam Teams map[string]fic.ExportedTeam
API AirbusAPI API AirbusAPI
Coeff float64 Coeff float64
} }
func (w *Walker) WalkScore(path string, d os.DirEntry, err error) error { func (w *Walker) treat(path string) {
if filepath.Base(path) == "scores.json" {
mypath := filepath.Join(filepath.Dir(path), "my.json") mypath := filepath.Join(filepath.Dir(path), "my.json")
if _, err := os.Stat(mypath); !os.IsNotExist(err) { if _, err := os.Stat(mypath); !os.IsNotExist(err) {
// Read team ID // Read team ID
fdmy, err := os.Open(mypath) fdmy, err := os.Open(mypath)
if err != nil { if err != nil {
return err log.Println("Unable to open my.json:", err)
return
} }
defer fdmy.Close() defer fdmy.Close()
teammy, err := fic.ReadMyJSON(fdmy) teammy, err := fic.ReadMyJSON(fdmy)
if err != nil { if err != nil {
return err log.Println("Unable to parse my.json:", err)
return
} }
airbusTeamId := NewAirbusUserId(w.Teams[fmt.Sprintf("%d", teammy.Id)].ExternalId) airbusTeamId := NewAirbusUserId(w.Teams[fmt.Sprintf("%d", teammy.Id)].ExternalId)
@ -38,17 +40,23 @@ func (w *Walker) WalkScore(path string, d os.DirEntry, err error) error {
// Treat score grid // Treat score grid
err = w.TreatScoreGrid(path, airbusTeamId) err = w.TreatScoreGrid(path, airbusTeamId)
if err != nil { if err != nil {
return err log.Println("Unable to treat score grid:", err)
return
} }
// Balance scores // Balance scores
err = w.BalanceScore(int64(float64(teammy.Points)*w.Coeff), airbusTeamId) err = w.BalanceScore(int64(float64(teammy.Points)*w.Coeff), airbusTeamId)
if err != nil { if err != nil {
return err log.Println("Unable to balance score:", err)
return
} }
} }
} }
func (w *Walker) WalkScore(path string, d os.DirEntry, err error) error {
if filepath.Base(path) == "scores.json" {
go w.treat(path)
}
return nil return nil
} }
@ -66,8 +74,12 @@ func (w *Walker) TreatScoreGrid(path string, airbusTeamId AirbusUserId) error {
} }
// Found all new entries // Found all new entries
maxts := w.LastSync[airbusTeamId]
for _, row := range teamscores { for _, row := range teamscores {
if row.Time.After(w.LastSync) { if row.Time.After(maxts) {
maxts = row.Time
}
if row.Time.After(w.LastSync[airbusTeamId]) {
if row.Reason == "Validation" { if row.Reason == "Validation" {
err = w.API.ValidateChallengeFromUser(airbusTeamId, w.Exercices[row.IdExercice]) err = w.API.ValidateChallengeFromUser(airbusTeamId, w.Exercices[row.IdExercice])
} else { } else {
@ -80,6 +92,8 @@ func (w *Walker) TreatScoreGrid(path string, airbusTeamId AirbusUserId) error {
} }
} }
w.LastSync[airbusTeamId] = maxts
return nil return nil
} }