From af96f82831de43fc74dc537cf5e01ed3063ad3ef Mon Sep 17 00:00:00 2001 From: Pierre-Olivier Mercier Date: Sun, 9 May 2021 19:18:48 +0200 Subject: [PATCH] Implement basic queue, where is it impossible to run two identical job in parallel --- engine/docker/docker.go | 26 ++++++++++++++++++++++---- engine/queue.go | 25 +++++++++++++++++++++++++ jobs.go | 10 ++++++++++ queue.go | 24 ++++++++++++++++++++++++ 4 files changed, 81 insertions(+), 4 deletions(-) create mode 100644 queue.go diff --git a/engine/docker/docker.go b/engine/docker/docker.go index e47ac1f..f459e9d 100644 --- a/engine/docker/docker.go +++ b/engine/docker/docker.go @@ -41,7 +41,7 @@ func PsName() (ret map[string]string, err error) { return ret, err } -func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { +func Create(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { cli, err := newCli() if err != nil { return "", err @@ -69,12 +69,30 @@ func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (stri return "", err } - err = cli.ContainerStart(context.Background(), ctnr.ID, types.ContainerStartOptions{}) + return ctnr.ID, nil +} + +func Start(id string) error { + cli, err := newCli() if err != nil { - return "", err + return err } - return ctnr.ID, nil + err = cli.ContainerStart(context.Background(), id, types.ContainerStartOptions{}) + if err != nil { + return err + } + + return nil +} + +func Run(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { + ctrid, err := Create(jobtype, image, cmd, mounts) + if err != nil { + return ctrid, err + } + + return ctrid, Start(ctrid) } func Logs(id string) (io.ReadCloser, error) { diff --git a/engine/queue.go b/engine/queue.go index 089da8e..c5f4877 100644 --- a/engine/queue.go +++ b/engine/queue.go @@ -3,8 +3,33 @@ package engine import ( "crypto/sha256" "fmt" + "strings" ) func GenContainerPrefix(jobtype string) string { return fmt.Sprintf("minifaas-%x-", sha256.Sum224([]byte(jobtype))) } + +func FilterRunningContainers(jobtype string, ctrs map[string]string) (ret []string) { + prefix := GenContainerPrefix(jobtype) + + for cname, _ := range ctrs { + if jobtype == "" || strings.HasPrefix(cname, prefix) { + ret = append(ret, cname) + } + } + + return +} + +func CountRunningContainers(jobtype string, ctrs map[string]string) (n int) { + prefix := GenContainerPrefix(jobtype) + + for cname, _ := range ctrs { + if jobtype == "" || strings.HasPrefix(strings.TrimPrefix(cname, "/"), prefix) { + n += 1 + } + } + + return +} diff --git a/jobs.go b/jobs.go index 42199f1..d8d0023 100644 --- a/jobs.go +++ b/jobs.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/docker/docker/api/types/mount" "github.com/nemunaire/minifaas/engine/docker" @@ -42,6 +44,14 @@ func RunJob(jobtype string) (string, error) { } } + n, err := CountRunningContainers(jobtype) + if err != nil { + return "", err + } + if n > 0 { + return "", fmt.Errorf("Please wait, there is already a similar job running.") + } + return docker.Run( jobtype, jobs[jobtype].Image, diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..0067f46 --- /dev/null +++ b/queue.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/nemunaire/minifaas/engine" + "github.com/nemunaire/minifaas/engine/docker" +) + +func FilterRunningContainers(jobtype string) (ret []string, err error) { + ctrs, errr := docker.PsName() + if errr != nil { + return nil, errr + } + + return engine.FilterRunningContainers(jobtype, ctrs), nil +} + +func CountRunningContainers(jobtype string) (n int, err error) { + ctrs, errr := docker.PsName() + if errr != nil { + return 0, errr + } + + return engine.CountRunningContainers(jobtype, ctrs), nil +}