package main import ( "context" "fmt" "log" "net/http" "strings" "sync" "time" "github.com/gin-gonic/gin" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" ) var ( OffsetQuestionTimer uint = 700 WSClients = map[int64][]WSClient{} WSClientsMutex = sync.RWMutex{} WSAdmin = []WSClient{} WSAdminMutex = sync.RWMutex{} ) func declareAPIAuthDirectRoutes(router *gin.RouterGroup) { router.GET("/ws", SurveyWS) } func declareAPIAdminDirectRoutes(router *gin.RouterGroup) { router.GET("/ws-admin", SurveyWSAdmin) router.GET("/ws/stats", func(c *gin.Context) { s := c.MustGet("survey").(*Survey) c.JSON(http.StatusOK, WSSurveyStats(s.Id)) }) } func WSSurveyStats(sid int64) map[string]interface{} { var users []map[string]interface{} var nb int WSClientsMutex.RLock() defer WSClientsMutex.RUnlock() if w, ok := WSClients[sid]; ok { nb = len(w) for _, ws := range w { users = append(users, map[string]interface{}{ "id": ws.u.Id, "login": ws.u.Login, }) } } return map[string]interface{}{ "nb_clients": nb, "users": users, } } type WSClient struct { ws *websocket.Conn c chan WSMessage u *User sid int64 } func SurveyWS_run(ws *websocket.Conn, c chan WSMessage, sid int64, u *User) { for { msg, ok := <-c if !ok { break } ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() err := wsjson.Write(ctx, ws, msg) if err != nil { log.Println("Error on WebSocket:", err) ws.Close(websocket.StatusInternalError, "error on write") break } } ws.Close(websocket.StatusNormalClosure, "end") WSClientsMutex.Lock() defer WSClientsMutex.Unlock() for i, clt := range WSClients[sid] { if clt.ws == ws { WSClients[sid][i] = WSClients[sid][len(WSClients[sid])-1] WSClients[sid] = WSClients[sid][:len(WSClients[sid])-1] break } } log.Println(u.Login, "disconnected") } func msgCurrentState(survey *Survey) (msg WSMessage) { if *survey.Direct == 0 { msg = WSMessage{ Action: "pause", } } else { var correction map[string]int if survey.Corrected { correction = getCorrectionString(*survey.Direct) } msg = WSMessage{ Action: "new_question", QuestionId: survey.Direct, Corrected: survey.Corrected, Corrections: correction, } } return } func SurveyWS(c *gin.Context) { u := c.MustGet("LoggedUser").(*User) survey := c.MustGet("survey").(*Survey) if survey.Direct == nil { c.AbortWithStatusJSON(http.StatusPaymentRequired, gin.H{"errmsg": "Not a live survey"}) return } ws, err := websocket.Accept(c.Writer, c.Request, nil) if err != nil { log.Fatal("error get connection", err) } log.Println(u.Login, "is now connected to WS", survey.Id) ch := make(chan WSMessage, 1) WSClientsMutex.Lock() defer WSClientsMutex.Unlock() WSClients[survey.Id] = append(WSClients[survey.Id], WSClient{ws, ch, u, survey.Id}) // Send current state ch <- msgCurrentState(survey) go SurveyWS_run(ws, ch, survey.Id, u) go func(c chan WSMessage, sid int) { var v WSMessage var err error for { err = wsjson.Read(context.Background(), ws, &v) if err != nil { log.Println("Error when receiving message:", err) close(c) break } if v.Action == "myscroll" { v.UserId = &u.Id v.SurveyId = &survey.Id WSAdminWriteAll(v) } else { log.Println("Unknown ws response:", v.Action) } } }(ch, int(survey.Id)) } func WSWriteAll(message WSMessage) { WSClientsMutex.RLock() defer WSClientsMutex.RUnlock() for _, wss := range WSClients { for _, ws := range wss { ws.c <- message } } } type WSMessage struct { Action string `json:"action"` SurveyId *int64 `json:"survey,omitempty"` QuestionId *int64 `json:"question,omitempty"` Stats map[string]interface{} `json:"stats,omitempty"` UserId *int64 `json:"user,omitempty"` Response string `json:"value,omitempty"` Corrected bool `json:"corrected,omitempty"` Corrections map[string]int `json:"corrections,omitempty"` Timer uint `json:"timer,omitempty"` } func (s *Survey) WSWriteAll(message WSMessage) { WSClientsMutex.RLock() defer WSClientsMutex.RUnlock() if wss, ok := WSClients[s.Id]; ok { for _, ws := range wss { ws.c <- message } } } func (s *Survey) WSCloseAll(message string) { WSClientsMutex.RLock() defer WSClientsMutex.RUnlock() if wss, ok := WSClients[s.Id]; ok { for _, ws := range wss { close(ws.c) } } } // Admin ////////////////////////////////////////////////////////////// func SurveyWSAdmin_run(ctx context.Context, ws *websocket.Conn, c chan WSMessage, sid int64, u *User) { ct := time.Tick(25000 * time.Millisecond) loopadmin: for { select { case <-ct: ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() err := wsjson.Write(ctx, ws, WSMessage{ Action: "stats", Stats: WSSurveyStats(sid), }) if err != nil { log.Println("Error on WebSocket:", err) ws.Close(websocket.StatusInternalError, "error on write") break } case msg, ok := <-c: if !ok { break loopadmin } ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() err := wsjson.Write(ctx, ws, msg) if err != nil { log.Println("Error on WebSocket:", err) ws.Close(websocket.StatusInternalError, "error on write") break } } } ws.Close(websocket.StatusNormalClosure, "end") WSAdminMutex.Lock() defer WSAdminMutex.Unlock() for i, clt := range WSAdmin { if clt.ws == ws { WSAdmin[i] = WSAdmin[len(WSAdmin)-1] WSAdmin = WSAdmin[:len(WSAdmin)-1] break } } log.Println(u.Login, "admin disconnected") } func getCorrectionString(qid int64) (ret map[string]int) { q, err := getQuestion(int(qid)) if err != nil { return } cts, err := q.GetCorrectionTemplates() if err != nil { return } ret = map[string]int{} for _, ct := range cts { ret[ct.RegExp] = ct.Score } return } func getResponsesStats(qid int64) map[string]interface{} { q, err := getQuestion(int(qid)) if err != nil { return nil } responses, err := q.GetResponses() if err != nil { log.Println("Unable to retrieve responses:", err) return nil } labels := []string{} values := []uint{} if q.Kind == "mcq" || q.Kind == "ucq" { proposals, err := q.GetProposals() if err != nil { return nil } proposal_idx := map[string]int{} for _, p := range proposals { proposal_idx[fmt.Sprintf("%d", p.Id)] = len(labels) labels = append(labels, p.Label) values = append(values, 0) } for _, r := range responses { for _, v := range strings.Split(r.Answer, ",") { values[proposal_idx[v]]++ } } } else { stats := map[string]uint{} for _, r := range responses { stats[r.Answer]++ } for k, v := range stats { labels = append(labels, k) values = append(values, v) } } return map[string]interface{}{ "labels": labels, "datasets": []map[string][]uint{ map[string][]uint{ "values": values, }, }, } } func SurveyWSAdmin(c *gin.Context) { u := c.MustGet("LoggedUser").(*User) survey := c.MustGet("survey").(*Survey) if survey.Direct == nil { c.AbortWithStatusJSON(http.StatusPaymentRequired, gin.H{"errmsg": "Not a live survey"}) return } ws, err := websocket.Accept(c.Writer, c.Request, nil) if err != nil { log.Fatal("error get connection", err) } log.Println(u.Login, "is now connected to WS-admin", survey.Id) ch := make(chan WSMessage, 2) WSAdminMutex.Lock() defer WSAdminMutex.Unlock() WSAdmin = append(WSAdmin, WSClient{ws, ch, u, survey.Id}) // Send current state ch <- msgCurrentState(survey) go SurveyWSAdmin_run(c.Request.Context(), ws, ch, survey.Id, u) go func(c chan WSMessage, sid int) { var v WSMessage var err error var surveyTimer *time.Timer for { // Reset variable state v.Corrected = false v.Corrections = nil v.Timer = 0 err = wsjson.Read(context.Background(), ws, &v) if err != nil { log.Println("Error when receiving message:", err) close(c) break } if v.Action == "new_question" && v.QuestionId != nil { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else { // Skip any existing scheduled timer if surveyTimer != nil { if !surveyTimer.Stop() { <-surveyTimer.C } surveyTimer = nil } survey.Direct = v.QuestionId if v.Timer > 0 { survey.Corrected = false survey.Update() // Save corrected state for the callback corrected := v.Corrected with_stats := v.Stats != nil surveyTimer = time.AfterFunc(time.Duration(OffsetQuestionTimer+v.Timer)*time.Millisecond, func() { surveyTimer = nil if corrected { survey.Corrected = v.Corrected survey.Update() var stats map[string]interface{} if with_stats { stats = getResponsesStats(*v.QuestionId) } survey.WSWriteAll(WSMessage{Action: "new_question", QuestionId: v.QuestionId, Corrected: true, Stats: stats, Corrections: getCorrectionString(*v.QuestionId)}) } else { var z int64 = 0 survey.Direct = &z survey.Update() survey.WSWriteAll(WSMessage{Action: "pause"}) WSAdminWriteAll(WSMessage{Action: "pause", SurveyId: &survey.Id}) } }) v.Corrected = false v.Stats = nil } else { survey.Corrected = v.Corrected if v.Corrected { v.Corrections = getCorrectionString(*v.QuestionId) if v.Stats != nil { v.Stats = getResponsesStats(*v.QuestionId) } else { v.Stats = nil } } else { v.Stats = nil } } _, err = survey.Update() if err != nil { log.Println("Unable to update survey:", err) } survey.WSWriteAll(v) v.SurveyId = &survey.Id WSAdminWriteAll(v) } } else if v.Action == "pause" { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else { var u int64 = 0 survey.Direct = &u _, err = survey.Update() if err != nil { log.Println("Unable to update survey:", err) } survey.WSWriteAll(v) v.SurveyId = &survey.Id WSAdminWriteAll(v) } } else if v.Action == "end" { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else { survey.EndAvailability = time.Now() survey.Direct = nil _, err = survey.Update() if err != nil { log.Println("Unable to update survey:", err) } survey.WSCloseAll("Fin du live") v.SurveyId = &survey.Id WSAdminWriteAll(v) } } else if v.Action == "get_stats" { err = wsjson.Write(context.Background(), ws, WSMessage{Action: "stats", Stats: WSSurveyStats(int64(sid))}) } else if v.Action == "get_responses" { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else if questions, err := survey.GetQuestions(); err != nil { log.Println("Unable to retrieve questions:", err) } else { for _, q := range questions { if responses, err := q.GetResponses(); err != nil { log.Println("Unable to retrieve questions:", err) } else { for _, r := range responses { wsjson.Write(context.Background(), ws, WSMessage{Action: "new_response", UserId: &r.IdUser, QuestionId: &q.Id, Response: r.Answer}) } } } } } else if v.Action == "get_asks" { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else if asks, err := survey.GetAsks(v.Response == ""); err != nil { log.Println("Unable to retrieve asks:", err) } else { for _, a := range asks { wsjson.Write(context.Background(), ws, WSMessage{Action: "new_ask", UserId: &a.IdUser, QuestionId: &a.Id, Response: a.Content}) } } } else if v.Action == "mark_answered" && v.QuestionId != nil { if asks, err := GetAsk(int(*v.QuestionId)); err != nil { log.Println("Unable to retrieve ask:", err) } else { asks.Answered = true err = asks.Update() if err != nil { log.Println("Unable to update:", err) } } } else if v.Action == "mark_answered" && v.Response == "all" { if survey, err := getSurvey(sid); err != nil { log.Println("Unable to retrieve survey:", err) } else if asks, err := survey.GetAsks(v.Response == ""); err != nil { log.Println("Unable to retrieve asks:", err) } else { for _, ask := range asks { ask.Answered = true err = ask.Update() if err != nil { log.Println("Unable to update:", err) } } } } else if v.Action == "where_are_you" { survey.WSWriteAll(v) } else { log.Println("Unknown admin action:", v.Action) } } }(ch, int(survey.Id)) } func WSAdminWriteAll(message WSMessage) { WSAdminMutex.RLock() defer WSAdminMutex.RUnlock() for _, ws := range WSAdmin { ws.c <- message } } func (s *Survey) WSAdminWriteAll(message WSMessage) { WSAdminMutex.RLock() defer WSAdminMutex.RUnlock() for _, ws := range WSAdmin { if ws.sid == s.Id { ws.c <- message } } }