Add queuing system

This commit is contained in:
nemunaire 2021-07-31 17:16:38 +02:00
commit f2eacf41cf
6 changed files with 106 additions and 17 deletions

View file

@ -1,6 +1,12 @@
package main
import (
"fmt"
"log"
"time"
"github.com/docker/docker/api/types"
"github.com/nemunaire/minifaas/engine"
"github.com/nemunaire/minifaas/engine/docker"
)
@ -22,3 +28,50 @@ func CountRunningContainers(jobtype string) (n int, err error) {
return engine.CountRunningContainers(jobtype, ctrs), nil
}
func GetJobTypeFromNames(names []string) (jobtype string, err error) {
for _, name := range names {
if jobtype, _, err = engine.ParseContainerName(name); err == nil {
// We found it, jobtype is already defined, just return
return
}
}
// Not returned before == not found
return "", fmt.Errorf("This is not a minifass job")
}
func runQueue() {
for {
time.Sleep(time.Second)
//log.Println("queue: tick")
ctrs, err := docker.Ps(types.ContainerListOptions{All: true})
if err != nil {
log.Println("queue: run:", err.Error())
continue
}
for _, ctr := range ctrs {
if ctr.State == "created" {
jobtype, err := GetJobTypeFromNames(ctr.Names)
if err != nil {
log.Println(err.Error())
continue
}
n, err := CountRunningContainers(jobtype)
if err != nil {
continue
}
if n > 0 {
log.Printf("Waiting slot for container %s (jobtype=%s)", ctr.ID, jobtype)
continue
}
log.Printf("Starting container %s (jobtype=%s) on available slot", ctr.ID, jobtype)
docker.Start(ctr.ID)
}
}
}
}