This repository has been archived on 2024-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
minifaas/queue.go

78 lines
1.6 KiB
Go

package main
import (
"fmt"
"log"
"time"
"github.com/docker/docker/api/types"
"github.com/nemunaire/minifaas/engine"
"github.com/nemunaire/minifaas/engine/docker"
)
func FilterRunningContainers(jobtype string) (ret []string, err error) {
ctrs, errr := docker.PsName()
if errr != nil {
return nil, errr
}
return engine.FilterRunningContainers(jobtype, ctrs), nil
}
func CountRunningContainers(jobtype string) (n int, err error) {
ctrs, errr := docker.PsName()
if errr != nil {
return 0, errr
}
return engine.CountRunningContainers(jobtype, ctrs), nil
}
func GetJobTypeFromNames(names []string) (jobtype string, err error) {
for _, name := range names {
if jobtype, _, err = engine.ParseContainerName(name); err == nil {
// We found it, jobtype is already defined, just return
return
}
}
// Not returned before == not found
return "", fmt.Errorf("This is not a minifass job")
}
func runQueue() {
for {
time.Sleep(time.Second)
//log.Println("queue: tick")
ctrs, err := docker.Ps(types.ContainerListOptions{All: true})
if err != nil {
log.Println("queue: run:", err.Error())
continue
}
for _, ctr := range ctrs {
if ctr.State == "created" {
jobtype, err := GetJobTypeFromNames(ctr.Names)
if err != nil {
log.Println(err.Error())
continue
}
n, err := CountRunningContainers(jobtype)
if err != nil {
continue
}
if n > 0 {
log.Printf("Waiting slot for container %s (jobtype=%s)", ctr.ID, jobtype)
continue
}
log.Printf("Starting container %s (jobtype=%s) on available slot", ctr.ID, jobtype)
docker.Start(ctr.ID)
}
}
}
}