78 lines
1.6 KiB
Go
78 lines
1.6 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
|
|
"github.com/nemunaire/minifaas/engine"
|
|
"github.com/nemunaire/minifaas/engine/docker"
|
|
)
|
|
|
|
func FilterRunningContainers(jobtype string) (ret []string, err error) {
|
|
ctrs, errr := docker.PsName()
|
|
if errr != nil {
|
|
return nil, errr
|
|
}
|
|
|
|
return engine.FilterRunningContainers(jobtype, ctrs), nil
|
|
}
|
|
|
|
func CountRunningContainers(jobtype string) (n int, err error) {
|
|
ctrs, errr := docker.PsName()
|
|
if errr != nil {
|
|
return 0, errr
|
|
}
|
|
|
|
return engine.CountRunningContainers(jobtype, ctrs), nil
|
|
}
|
|
|
|
func GetJobTypeFromNames(names []string) (jobtype string, err error) {
|
|
for _, name := range names {
|
|
if jobtype, _, err = engine.ParseContainerName(name); err == nil {
|
|
// We found it, jobtype is already defined, just return
|
|
return
|
|
}
|
|
}
|
|
|
|
// Not returned before == not found
|
|
return "", fmt.Errorf("This is not a minifass job")
|
|
}
|
|
|
|
func runQueue() {
|
|
for {
|
|
time.Sleep(time.Second)
|
|
//log.Println("queue: tick")
|
|
|
|
ctrs, err := docker.Ps(types.ContainerListOptions{All: true})
|
|
if err != nil {
|
|
log.Println("queue: run:", err.Error())
|
|
continue
|
|
}
|
|
|
|
for _, ctr := range ctrs {
|
|
if ctr.State == "created" {
|
|
jobtype, err := GetJobTypeFromNames(ctr.Names)
|
|
if err != nil {
|
|
log.Println(err.Error())
|
|
continue
|
|
}
|
|
|
|
n, err := CountRunningContainers(jobtype)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if n > 0 {
|
|
log.Printf("Waiting slot for container %s (jobtype=%s)", ctr.ID, jobtype)
|
|
continue
|
|
}
|
|
|
|
log.Printf("Starting container %s (jobtype=%s) on available slot", ctr.ID, jobtype)
|
|
docker.Start(ctr.ID)
|
|
}
|
|
}
|
|
}
|
|
}
|