diff --git a/app.go b/app.go index bf5ff22..5fe78bd 100644 --- a/app.go +++ b/app.go @@ -69,8 +69,7 @@ func NewApp(cfg *config.Config) App { }) router.GET("/api/jobs/:jobid/logs", func(c *gin.Context) { - // Check ID exists - _, err := docker.GetContainer(c.Param("jobid")) + stream, err := docker.Logs(c.Param("jobid")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return @@ -82,16 +81,6 @@ func NewApp(cfg *config.Config) App { } defer ws.Close(websocket.StatusInternalError, "the sky is falling") - for !docker.HasStarted(c.Param("jobid")) { - time.Sleep(500 * time.Millisecond) - } - - stream, err := docker.Logs(c.Param("jobid")) - if err != nil { - ws.Close(websocket.StatusNormalClosure, "") - log.Fatal("Something goes wrong during log access:", err) - } - r, w := io.Pipe() ctx, cancel := context.WithTimeout(c.Request.Context(), time.Second*10) @@ -130,7 +119,6 @@ func (app *App) Start() { Handler: app.router, } - log.Printf("Ready, listening on %s\n", app.cfg.Bind) if err := app.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen: %s\n", err) } diff --git a/engine/docker/docker.go b/engine/docker/docker.go index 89c08f6..f459e9d 100644 --- a/engine/docker/docker.go +++ b/engine/docker/docker.go @@ -16,17 +16,17 @@ func newCli() (*client.Client, error) { return client.NewClientWithOpts(client.FromEnv) } -func Ps(opts types.ContainerListOptions) ([]types.Container, error) { +func Ps() ([]types.Container, error) { cli, err := newCli() if err != nil { return nil, err } - return cli.ContainerList(context.Background(), opts) + return cli.ContainerList(context.Background(), types.ContainerListOptions{}) } func PsName() (ret map[string]string, err error) { - containers, err := Ps(types.ContainerListOptions{}) + containers, err := Ps() if err != nil { return nil, err } @@ -41,24 +41,6 @@ func PsName() (ret map[string]string, err error) { return ret, err } -func GetContainer(containerID string) (ctr types.ContainerJSON, err error) { - cli, err := newCli() - if err != nil { - return types.ContainerJSON{}, err - } - - return cli.ContainerInspect(context.Background(), containerID) -} - -func HasStarted(containerID string) bool { - ctr, err := GetContainer(containerID) - if err != nil { - return false - } - - return ctr.State.Status == "running" || ctr.State.Status == "paused" || ctr.State.Status == "restarting" || ctr.State.Status == "removing" || ctr.State.Status == "exited" || ctr.State.Status == "dead" -} - func Create(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { cli, err := newCli() if err != nil { diff --git a/engine/queue.go b/engine/queue.go index f7d0075..c5f4877 100644 --- a/engine/queue.go +++ b/engine/queue.go @@ -3,36 +3,11 @@ package engine import ( "crypto/sha256" "fmt" - "log" "strings" - - "github.com/nemunaire/minifaas/jobs" ) -const CTR_NAME_PREFIX = "minifaas" - func GenContainerPrefix(jobtype string) string { - return fmt.Sprintf("%s-%x-", CTR_NAME_PREFIX, sha256.Sum224([]byte(jobtype))) -} - -func ParseContainerName(name string) (jobtype, id string, err error) { - if !strings.HasPrefix(strings.TrimPrefix(name, "/"), CTR_NAME_PREFIX+"-") { - return "", "", fmt.Errorf("This is not a %s job: starting with %q", CTR_NAME_PREFIX, name) - } - - tmp := strings.Split(name, "-") - if len(tmp) < 3 { - return "", "", fmt.Errorf("This is not a %s job: %q didn't has at least 3 args", CTR_NAME_PREFIX, name) - } - - jobtype = jobs.GetJobType(tmp[1]) - if jobtype == "" { - return "", "", fmt.Errorf("This is not a %s job: unknown job type %q", CTR_NAME_PREFIX, tmp[1]) - } - - id = strings.Join(tmp[2:], "-") - - return + return fmt.Sprintf("minifaas-%x-", sha256.Sum224([]byte(jobtype))) } func FilterRunningContainers(jobtype string, ctrs map[string]string) (ret []string) { @@ -50,10 +25,7 @@ func FilterRunningContainers(jobtype string, ctrs map[string]string) (ret []stri func CountRunningContainers(jobtype string, ctrs map[string]string) (n int) { prefix := GenContainerPrefix(jobtype) - log.Println(ctrs) - for cname, _ := range ctrs { - log.Println(strings.TrimPrefix(cname, "/"), prefix) if jobtype == "" || strings.HasPrefix(strings.TrimPrefix(cname, "/"), prefix) { n += 1 } diff --git a/jobs.go b/jobs.go index 2738e2c..d8d0023 100644 --- a/jobs.go +++ b/jobs.go @@ -1,17 +1,30 @@ package main import ( + "fmt" + "github.com/docker/docker/api/types/mount" "github.com/nemunaire/minifaas/engine/docker" - "github.com/nemunaire/minifaas/jobs" ) -func RunJob(jobtype string) (string, error) { - job := jobs.GetJob(jobtype) +type Job struct { + Image string + Cmd []string + DataMount bool +} +var jobs = map[string]Job{ + "counter": { + Image: "alpine", + Cmd: []string{"sh", "-c", "touch /data/work_done; for i in `seq 10`; do echo $i; sleep 0.5; done"}, + DataMount: true, + }, +} + +func RunJob(jobtype string) (string, error) { var mnts []mount.Mount - if job.DataMount { + if jobs[jobtype].DataMount { myVolume, err := docker.CreateVolumeDir("/data", false) if err != nil { return "", err @@ -20,21 +33,29 @@ func RunJob(jobtype string) (string, error) { } // Check if the image is here - hasimg, err := docker.HasImage(job.Image) + hasimg, err := docker.HasImage(jobs[jobtype].Image) if err != nil { return "", err } if !hasimg { - err = docker.PullImage(job.Image) + err = docker.PullImage(jobs[jobtype].Image) if err != nil { return "", err } } - return docker.Create( + n, err := CountRunningContainers(jobtype) + if err != nil { + return "", err + } + if n > 0 { + return "", fmt.Errorf("Please wait, there is already a similar job running.") + } + + return docker.Run( jobtype, - job.Image, - job.Cmd, + jobs[jobtype].Image, + jobs[jobtype].Cmd, mnts, ) } diff --git a/jobs/jobs.go b/jobs/jobs.go deleted file mode 100644 index 767e406..0000000 --- a/jobs/jobs.go +++ /dev/null @@ -1,41 +0,0 @@ -package jobs - -import ( - "crypto/sha256" - "fmt" -) - -type Job struct { - Image string - Cmd []string - DataMount bool -} - -var jobs = map[string]Job{ - "counter": { - Image: "alpine", - Cmd: []string{"sh", "-c", "touch /data/work_done; for i in `seq 10`; do echo $i; sleep 0.5; done"}, - DataMount: true, - }, -} - -var revJobs = map[string]string{} - -func init() { - for jobtype := range jobs { - revJobs[fmt.Sprintf("%x", sha256.Sum224([]byte(jobtype)))] = jobtype - } -} - -func GetJobType(hashJobType string) (jobtype string) { - var ok bool - if jobtype, ok = revJobs[hashJobType]; !ok { - jobtype = "" - } - - return -} - -func GetJob(jobtype string) (job Job) { - return jobs[jobtype] -} diff --git a/main.go b/main.go index db4a924..6e114fb 100644 --- a/main.go +++ b/main.go @@ -18,12 +18,11 @@ func main() { a := NewApp(cfg) go a.Start() - go runQueue() - quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit log.Println("Stopping the service...") a.Stop() log.Println("Stopped") + } diff --git a/queue.go b/queue.go index d59d63e..0067f46 100644 --- a/queue.go +++ b/queue.go @@ -1,12 +1,6 @@ package main import ( - "fmt" - "log" - "time" - - "github.com/docker/docker/api/types" - "github.com/nemunaire/minifaas/engine" "github.com/nemunaire/minifaas/engine/docker" ) @@ -28,50 +22,3 @@ func CountRunningContainers(jobtype string) (n int, err error) { return engine.CountRunningContainers(jobtype, ctrs), nil } - -func GetJobTypeFromNames(names []string) (jobtype string, err error) { - for _, name := range names { - if jobtype, _, err = engine.ParseContainerName(name); err == nil { - // We found it, jobtype is already defined, just return - return - } - } - - // Not returned before == not found - return "", fmt.Errorf("This is not a minifass job") -} - -func runQueue() { - for { - time.Sleep(time.Second) - //log.Println("queue: tick") - - ctrs, err := docker.Ps(types.ContainerListOptions{All: true}) - if err != nil { - log.Println("queue: run:", err.Error()) - continue - } - - for _, ctr := range ctrs { - if ctr.State == "created" { - jobtype, err := GetJobTypeFromNames(ctr.Names) - if err != nil { - log.Println(err.Error()) - continue - } - - n, err := CountRunningContainers(jobtype) - if err != nil { - continue - } - if n > 0 { - log.Printf("Waiting slot for container %s (jobtype=%s)", ctr.ID, jobtype) - continue - } - - log.Printf("Starting container %s (jobtype=%s) on available slot", ctr.ID, jobtype) - docker.Start(ctr.ID) - } - } - } -}