Implement basic queue, where is it impossible to run two identical job

in parallel
This commit is contained in:
nemunaire 2021-05-09 19:18:48 +02:00
parent 8259aaa474
commit af96f82831
4 changed files with 81 additions and 4 deletions

View File

@ -41,7 +41,7 @@ func PsName() (ret map[string]string, err error) {
return ret, err return ret, err
} }
func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { func Create(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) {
cli, err := newCli() cli, err := newCli()
if err != nil { if err != nil {
return "", err return "", err
@ -69,12 +69,30 @@ func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (stri
return "", err return "", err
} }
err = cli.ContainerStart(context.Background(), ctnr.ID, types.ContainerStartOptions{}) return ctnr.ID, nil
if err != nil {
return "", err
} }
return ctnr.ID, nil func Start(id string) error {
cli, err := newCli()
if err != nil {
return err
}
err = cli.ContainerStart(context.Background(), id, types.ContainerStartOptions{})
if err != nil {
return err
}
return nil
}
func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) {
ctrid, err := Create(jobtype, image, cmd, mounts)
if err != nil {
return ctrid, err
}
return ctrid, Start(ctrid)
} }
func Logs(id string) (io.ReadCloser, error) { func Logs(id string) (io.ReadCloser, error) {

View File

@ -3,8 +3,33 @@ package engine
import ( import (
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"strings"
) )
func GenContainerPrefix(jobtype string) string { func GenContainerPrefix(jobtype string) string {
return fmt.Sprintf("minifaas-%x-", sha256.Sum224([]byte(jobtype))) return fmt.Sprintf("minifaas-%x-", sha256.Sum224([]byte(jobtype)))
} }
func FilterRunningContainers(jobtype string, ctrs map[string]string) (ret []string) {
prefix := GenContainerPrefix(jobtype)
for cname, _ := range ctrs {
if jobtype == "" || strings.HasPrefix(cname, prefix) {
ret = append(ret, cname)
}
}
return
}
func CountRunningContainers(jobtype string, ctrs map[string]string) (n int) {
prefix := GenContainerPrefix(jobtype)
for cname, _ := range ctrs {
if jobtype == "" || strings.HasPrefix(strings.TrimPrefix(cname, "/"), prefix) {
n += 1
}
}
return
}

10
jobs.go
View File

@ -1,6 +1,8 @@
package main package main
import ( import (
"fmt"
"github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/mount"
"github.com/nemunaire/minifaas/engine/docker" "github.com/nemunaire/minifaas/engine/docker"
@ -42,6 +44,14 @@ func RunJob(jobtype string) (string, error) {
} }
} }
n, err := CountRunningContainers(jobtype)
if err != nil {
return "", err
}
if n > 0 {
return "", fmt.Errorf("Please wait, there is already a similar job running.")
}
return docker.Run( return docker.Run(
jobtype, jobtype,
jobs[jobtype].Image, jobs[jobtype].Image,

24
queue.go Normal file
View File

@ -0,0 +1,24 @@
package main
import (
"github.com/nemunaire/minifaas/engine"
"github.com/nemunaire/minifaas/engine/docker"
)
func FilterRunningContainers(jobtype string) (ret []string, err error) {
ctrs, errr := docker.PsName()
if errr != nil {
return nil, errr
}
return engine.FilterRunningContainers(jobtype, ctrs), nil
}
func CountRunningContainers(jobtype string) (n int, err error) {
ctrs, errr := docker.PsName()
if errr != nil {
return 0, errr
}
return engine.CountRunningContainers(jobtype, ctrs), nil
}