This repository has been archived on 2024-03-28. You can view files and clone it, but cannot push or open issues or pull requests.
atsebay.t/direct.go

560 lines
13 KiB
Go

package main
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
var (
OffsetQuestionTimer uint = 700
WSClients = map[int64][]WSClient{}
WSClientsMutex = sync.RWMutex{}
WSAdmin = []WSClient{}
WSAdminMutex = sync.RWMutex{}
)
func declareAPIAuthDirectRoutes(router *gin.RouterGroup) {
router.GET("/ws", SurveyWS)
}
func declareAPIAdminDirectRoutes(router *gin.RouterGroup) {
router.GET("/ws-admin", SurveyWSAdmin)
router.GET("/ws/stats", func(c *gin.Context) {
s := c.MustGet("survey").(*Survey)
c.JSON(http.StatusOK, WSSurveyStats(s.Id))
})
}
func WSSurveyStats(sid int64) map[string]interface{} {
var users []map[string]interface{}
var nb int
WSClientsMutex.RLock()
defer WSClientsMutex.RUnlock()
if w, ok := WSClients[sid]; ok {
nb = len(w)
for _, ws := range w {
users = append(users, map[string]interface{}{
"id": ws.u.Id,
"login": ws.u.Login,
})
}
}
return map[string]interface{}{
"nb_clients": nb,
"users": users,
}
}
type WSClient struct {
ws *websocket.Conn
c chan WSMessage
u *User
sid int64
}
func SurveyWS_run(ws *websocket.Conn, c chan WSMessage, sid int64, u *User) {
for {
msg, ok := <-c
if !ok {
break
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := wsjson.Write(ctx, ws, msg)
if err != nil {
log.Println("Error on WebSocket:", err)
ws.Close(websocket.StatusInternalError, "error on write")
break
}
}
ws.Close(websocket.StatusNormalClosure, "end")
WSClientsMutex.Lock()
defer WSClientsMutex.Unlock()
for i, clt := range WSClients[sid] {
if clt.ws == ws {
WSClients[sid][i] = WSClients[sid][len(WSClients[sid])-1]
WSClients[sid] = WSClients[sid][:len(WSClients[sid])-1]
break
}
}
log.Println(u.Login, "disconnected")
}
func msgCurrentState(survey *Survey) (msg WSMessage) {
if *survey.Direct == 0 {
msg = WSMessage{
Action: "pause",
}
} else {
var correction map[string]int
if survey.Corrected {
correction = getCorrectionString(*survey.Direct)
}
msg = WSMessage{
Action: "new_question",
QuestionId: survey.Direct,
Corrected: survey.Corrected,
Corrections: correction,
}
}
return
}
func SurveyWS(c *gin.Context) {
u := c.MustGet("LoggedUser").(*User)
survey := c.MustGet("survey").(*Survey)
if survey.Direct == nil {
c.AbortWithStatusJSON(http.StatusPaymentRequired, gin.H{"errmsg": "Not a live survey"})
return
}
ws, err := websocket.Accept(c.Writer, c.Request, nil)
if err != nil {
log.Fatal("error get connection", err)
}
log.Println(u.Login, "is now connected to WS", survey.Id)
ch := make(chan WSMessage, 1)
WSClientsMutex.Lock()
defer WSClientsMutex.Unlock()
WSClients[survey.Id] = append(WSClients[survey.Id], WSClient{ws, ch, u, survey.Id})
// Send current state
ch <- msgCurrentState(survey)
go SurveyWS_run(ws, ch, survey.Id, u)
go func(c chan WSMessage, sid int) {
var v WSMessage
var err error
for {
err = wsjson.Read(context.Background(), ws, &v)
if err != nil {
log.Println("Error when receiving message:", err)
close(c)
break
}
if v.Action == "myscroll" {
v.UserId = &u.Id
v.SurveyId = &survey.Id
WSAdminWriteAll(v)
} else {
log.Println("Unknown ws response:", v.Action)
}
}
}(ch, int(survey.Id))
}
func WSWriteAll(message WSMessage) {
WSClientsMutex.RLock()
defer WSClientsMutex.RUnlock()
for _, wss := range WSClients {
for _, ws := range wss {
ws.c <- message
}
}
}
type WSMessage struct {
Action string `json:"action"`
SurveyId *int64 `json:"survey,omitempty"`
QuestionId *int64 `json:"question,omitempty"`
Stats map[string]interface{} `json:"stats,omitempty"`
UserId *int64 `json:"user,omitempty"`
Response string `json:"value,omitempty"`
Corrected bool `json:"corrected,omitempty"`
Corrections map[string]int `json:"corrections,omitempty"`
Timer uint `json:"timer,omitempty"`
}
func (s *Survey) WSWriteAll(message WSMessage) {
WSClientsMutex.RLock()
defer WSClientsMutex.RUnlock()
if wss, ok := WSClients[s.Id]; ok {
for _, ws := range wss {
ws.c <- message
}
}
}
func (s *Survey) WSCloseAll(message string) {
WSClientsMutex.RLock()
defer WSClientsMutex.RUnlock()
if wss, ok := WSClients[s.Id]; ok {
for _, ws := range wss {
close(ws.c)
}
}
}
// Admin //////////////////////////////////////////////////////////////
func SurveyWSAdmin_run(ctx context.Context, ws *websocket.Conn, c chan WSMessage, sid int64, u *User) {
ct := time.Tick(25000 * time.Millisecond)
loopadmin:
for {
select {
case <-ct:
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := wsjson.Write(ctx, ws, WSMessage{
Action: "stats",
Stats: WSSurveyStats(sid),
})
if err != nil {
log.Println("Error on WebSocket:", err)
ws.Close(websocket.StatusInternalError, "error on write")
break
}
case msg, ok := <-c:
if !ok {
break loopadmin
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
err := wsjson.Write(ctx, ws, msg)
if err != nil {
log.Println("Error on WebSocket:", err)
ws.Close(websocket.StatusInternalError, "error on write")
break
}
}
}
ws.Close(websocket.StatusNormalClosure, "end")
WSAdminMutex.Lock()
defer WSAdminMutex.Unlock()
for i, clt := range WSAdmin {
if clt.ws == ws {
WSAdmin[i] = WSAdmin[len(WSAdmin)-1]
WSAdmin = WSAdmin[:len(WSAdmin)-1]
break
}
}
log.Println(u.Login, "admin disconnected")
}
func getCorrectionString(qid int64) (ret map[string]int) {
q, err := getQuestion(int(qid))
if err != nil {
return
}
cts, err := q.GetCorrectionTemplates()
if err != nil {
return
}
ret = map[string]int{}
for _, ct := range cts {
ret[ct.RegExp] = ct.Score
}
return
}
func getResponsesStats(qid int64) map[string]interface{} {
q, err := getQuestion(int(qid))
if err != nil {
return nil
}
responses, err := q.GetResponses()
if err != nil {
log.Println("Unable to retrieve responses:", err)
return nil
}
labels := []string{}
values := []uint{}
if q.Kind == "mcq" || q.Kind == "ucq" {
proposals, err := q.GetProposals()
if err != nil {
return nil
}
proposal_idx := map[string]int{}
for _, p := range proposals {
proposal_idx[fmt.Sprintf("%d", p.Id)] = len(labels)
labels = append(labels, p.Label)
values = append(values, 0)
}
for _, r := range responses {
for _, v := range strings.Split(r.Answer, ",") {
values[proposal_idx[v]]++
}
}
} else {
stats := map[string]uint{}
for _, r := range responses {
stats[r.Answer]++
}
for k, v := range stats {
labels = append(labels, k)
values = append(values, v)
}
}
return map[string]interface{}{
"labels": labels,
"datasets": []map[string][]uint{
map[string][]uint{
"values": values,
},
},
}
}
func SurveyWSAdmin(c *gin.Context) {
u := c.MustGet("LoggedUser").(*User)
survey := c.MustGet("survey").(*Survey)
if survey.Direct == nil {
c.AbortWithStatusJSON(http.StatusPaymentRequired, gin.H{"errmsg": "Not a live survey"})
return
}
ws, err := websocket.Accept(c.Writer, c.Request, nil)
if err != nil {
log.Fatal("error get connection", err)
}
log.Println(u.Login, "is now connected to WS-admin", survey.Id)
ch := make(chan WSMessage, 2)
WSAdminMutex.Lock()
defer WSAdminMutex.Unlock()
WSAdmin = append(WSAdmin, WSClient{ws, ch, u, survey.Id})
// Send current state
ch <- msgCurrentState(survey)
go SurveyWSAdmin_run(c.Request.Context(), ws, ch, survey.Id, u)
go func(c chan WSMessage, sid int) {
var v WSMessage
var err error
var surveyTimer *time.Timer
for {
// Reset variable state
v.Corrected = false
v.Corrections = nil
v.Timer = 0
err = wsjson.Read(context.Background(), ws, &v)
if err != nil {
log.Println("Error when receiving message:", err)
close(c)
break
}
if v.Action == "new_question" && v.QuestionId != nil {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else {
// Skip any existing scheduled timer
if surveyTimer != nil {
if !surveyTimer.Stop() {
<-surveyTimer.C
}
surveyTimer = nil
}
survey.Direct = v.QuestionId
if v.Timer > 0 {
survey.Corrected = false
survey.Update()
// Save corrected state for the callback
corrected := v.Corrected
with_stats := v.Stats != nil
surveyTimer = time.AfterFunc(time.Duration(OffsetQuestionTimer+v.Timer)*time.Millisecond, func() {
surveyTimer = nil
if corrected {
survey.Corrected = v.Corrected
survey.Update()
var stats map[string]interface{}
if with_stats {
stats = getResponsesStats(*v.QuestionId)
}
survey.WSWriteAll(WSMessage{Action: "new_question", QuestionId: v.QuestionId, Corrected: true, Stats: stats, Corrections: getCorrectionString(*v.QuestionId)})
} else {
var z int64 = 0
survey.Direct = &z
survey.Update()
survey.WSWriteAll(WSMessage{Action: "pause"})
WSAdminWriteAll(WSMessage{Action: "pause", SurveyId: &survey.Id})
}
})
v.Corrected = false
v.Stats = nil
} else {
survey.Corrected = v.Corrected
if v.Corrected {
v.Corrections = getCorrectionString(*v.QuestionId)
if v.Stats != nil {
v.Stats = getResponsesStats(*v.QuestionId)
} else {
v.Stats = nil
}
} else {
v.Stats = nil
}
}
_, err = survey.Update()
if err != nil {
log.Println("Unable to update survey:", err)
}
survey.WSWriteAll(v)
v.SurveyId = &survey.Id
WSAdminWriteAll(v)
}
} else if v.Action == "pause" {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else {
var u int64 = 0
survey.Direct = &u
_, err = survey.Update()
if err != nil {
log.Println("Unable to update survey:", err)
}
survey.WSWriteAll(v)
v.SurveyId = &survey.Id
WSAdminWriteAll(v)
}
} else if v.Action == "end" {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else {
survey.EndAvailability = time.Now()
survey.Direct = nil
_, err = survey.Update()
if err != nil {
log.Println("Unable to update survey:", err)
}
survey.WSCloseAll("Fin du live")
v.SurveyId = &survey.Id
WSAdminWriteAll(v)
}
} else if v.Action == "get_stats" {
err = wsjson.Write(context.Background(), ws, WSMessage{Action: "stats", Stats: WSSurveyStats(int64(sid))})
} else if v.Action == "get_responses" {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else if questions, err := survey.GetQuestions(); err != nil {
log.Println("Unable to retrieve questions:", err)
} else {
for _, q := range questions {
if responses, err := q.GetResponses(); err != nil {
log.Println("Unable to retrieve questions:", err)
} else {
for _, r := range responses {
wsjson.Write(context.Background(), ws, WSMessage{Action: "new_response", UserId: &r.IdUser, QuestionId: &q.Id, Response: r.Answer})
}
}
}
}
} else if v.Action == "get_asks" {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else if asks, err := survey.GetAsks(v.Response == ""); err != nil {
log.Println("Unable to retrieve asks:", err)
} else {
for _, a := range asks {
wsjson.Write(context.Background(), ws, WSMessage{Action: "new_ask", UserId: &a.IdUser, QuestionId: &a.Id, Response: a.Content})
}
}
} else if v.Action == "mark_answered" && v.QuestionId != nil {
if asks, err := GetAsk(int(*v.QuestionId)); err != nil {
log.Println("Unable to retrieve ask:", err)
} else {
asks.Answered = true
err = asks.Update()
if err != nil {
log.Println("Unable to update:", err)
}
}
} else if v.Action == "mark_answered" && v.Response == "all" {
if survey, err := getSurvey(sid); err != nil {
log.Println("Unable to retrieve survey:", err)
} else if asks, err := survey.GetAsks(v.Response == ""); err != nil {
log.Println("Unable to retrieve asks:", err)
} else {
for _, ask := range asks {
ask.Answered = true
err = ask.Update()
if err != nil {
log.Println("Unable to update:", err)
}
}
}
} else if v.Action == "where_are_you" {
survey.WSWriteAll(v)
} else {
log.Println("Unknown admin action:", v.Action)
}
}
}(ch, int(survey.Id))
}
func WSAdminWriteAll(message WSMessage) {
WSAdminMutex.RLock()
defer WSAdminMutex.RUnlock()
for _, ws := range WSAdmin {
ws.c <- message
}
}
func (s *Survey) WSAdminWriteAll(message WSMessage) {
WSAdminMutex.RLock()
defer WSAdminMutex.RUnlock()
for _, ws := range WSAdmin {
if ws.sid == s.Id {
ws.c <- message
}
}
}