diff --git a/app.go b/app.go index 5fe78bd..bf5ff22 100644 --- a/app.go +++ b/app.go @@ -69,7 +69,8 @@ func NewApp(cfg *config.Config) App { }) router.GET("/api/jobs/:jobid/logs", func(c *gin.Context) { - stream, err := docker.Logs(c.Param("jobid")) + // Check ID exists + _, err := docker.GetContainer(c.Param("jobid")) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return @@ -81,6 +82,16 @@ func NewApp(cfg *config.Config) App { } defer ws.Close(websocket.StatusInternalError, "the sky is falling") + for !docker.HasStarted(c.Param("jobid")) { + time.Sleep(500 * time.Millisecond) + } + + stream, err := docker.Logs(c.Param("jobid")) + if err != nil { + ws.Close(websocket.StatusNormalClosure, "") + log.Fatal("Something goes wrong during log access:", err) + } + r, w := io.Pipe() ctx, cancel := context.WithTimeout(c.Request.Context(), time.Second*10) @@ -119,6 +130,7 @@ func (app *App) Start() { Handler: app.router, } + log.Printf("Ready, listening on %s\n", app.cfg.Bind) if err := app.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen: %s\n", err) } diff --git a/engine/docker/docker.go b/engine/docker/docker.go index f459e9d..89c08f6 100644 --- a/engine/docker/docker.go +++ b/engine/docker/docker.go @@ -16,17 +16,17 @@ func newCli() (*client.Client, error) { return client.NewClientWithOpts(client.FromEnv) } -func Ps() ([]types.Container, error) { +func Ps(opts types.ContainerListOptions) ([]types.Container, error) { cli, err := newCli() if err != nil { return nil, err } - return cli.ContainerList(context.Background(), types.ContainerListOptions{}) + return cli.ContainerList(context.Background(), opts) } func PsName() (ret map[string]string, err error) { - containers, err := Ps() + containers, err := Ps(types.ContainerListOptions{}) if err != nil { return nil, err } @@ -41,6 +41,24 @@ func PsName() (ret map[string]string, err error) { return ret, err } +func GetContainer(containerID string) (ctr types.ContainerJSON, err error) { + cli, err := newCli() + if err != nil { + return types.ContainerJSON{}, err + } + + return cli.ContainerInspect(context.Background(), containerID) +} + +func HasStarted(containerID string) bool { + ctr, err := GetContainer(containerID) + if err != nil { + return false + } + + return ctr.State.Status == "running" || ctr.State.Status == "paused" || ctr.State.Status == "restarting" || ctr.State.Status == "removing" || ctr.State.Status == "exited" || ctr.State.Status == "dead" +} + func Create(jobtype string, image string, cmd []string, mounts []mount.Mount) (string, error) { cli, err := newCli() if err != nil { diff --git a/engine/queue.go b/engine/queue.go index c5f4877..5fbc37e 100644 --- a/engine/queue.go +++ b/engine/queue.go @@ -6,8 +6,23 @@ import ( "strings" ) +const CTR_NAME_PREFIX = "minifaas" + func GenContainerPrefix(jobtype string) string { - return fmt.Sprintf("minifaas-%x-", sha256.Sum224([]byte(jobtype))) + return fmt.Sprintf("%s-%x-", CTR_NAME_PREFIX, sha256.Sum224([]byte(jobtype))) +} + +func ParseContainerName(name string) (jobtype, id string, err error) { + if !strings.HasPrefix(name, "/"+CTR_NAME_PREFIX+"-") { + return "", "", fmt.Errorf("This is not a %s job: starting with %q", CTR_NAME_PREFIX, name) + } + + tmp := strings.Split(name, "-") + if len(tmp) < 3 { + return "", "", fmt.Errorf("This is not a %s job: %q didn't has at least 3 args", CTR_NAME_PREFIX, name) + } + + return tmp[1], strings.Join(tmp[2:], "-"), nil } func FilterRunningContainers(jobtype string, ctrs map[string]string) (ret []string) { diff --git a/jobs.go b/jobs.go index d8d0023..e9d9066 100644 --- a/jobs.go +++ b/jobs.go @@ -1,8 +1,6 @@ package main import ( - "fmt" - "github.com/docker/docker/api/types/mount" "github.com/nemunaire/minifaas/engine/docker" @@ -44,15 +42,7 @@ func RunJob(jobtype string) (string, error) { } } - n, err := CountRunningContainers(jobtype) - if err != nil { - return "", err - } - if n > 0 { - return "", fmt.Errorf("Please wait, there is already a similar job running.") - } - - return docker.Run( + return docker.Create( jobtype, jobs[jobtype].Image, jobs[jobtype].Cmd, diff --git a/main.go b/main.go index 6e114fb..db4a924 100644 --- a/main.go +++ b/main.go @@ -18,11 +18,12 @@ func main() { a := NewApp(cfg) go a.Start() + go runQueue() + quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit log.Println("Stopping the service...") a.Stop() log.Println("Stopped") - } diff --git a/queue.go b/queue.go index 0067f46..d59d63e 100644 --- a/queue.go +++ b/queue.go @@ -1,6 +1,12 @@ package main import ( + "fmt" + "log" + "time" + + "github.com/docker/docker/api/types" + "github.com/nemunaire/minifaas/engine" "github.com/nemunaire/minifaas/engine/docker" ) @@ -22,3 +28,50 @@ func CountRunningContainers(jobtype string) (n int, err error) { return engine.CountRunningContainers(jobtype, ctrs), nil } + +func GetJobTypeFromNames(names []string) (jobtype string, err error) { + for _, name := range names { + if jobtype, _, err = engine.ParseContainerName(name); err == nil { + // We found it, jobtype is already defined, just return + return + } + } + + // Not returned before == not found + return "", fmt.Errorf("This is not a minifass job") +} + +func runQueue() { + for { + time.Sleep(time.Second) + //log.Println("queue: tick") + + ctrs, err := docker.Ps(types.ContainerListOptions{All: true}) + if err != nil { + log.Println("queue: run:", err.Error()) + continue + } + + for _, ctr := range ctrs { + if ctr.State == "created" { + jobtype, err := GetJobTypeFromNames(ctr.Names) + if err != nil { + log.Println(err.Error()) + continue + } + + n, err := CountRunningContainers(jobtype) + if err != nil { + continue + } + if n > 0 { + log.Printf("Waiting slot for container %s (jobtype=%s)", ctr.ID, jobtype) + continue + } + + log.Printf("Starting container %s (jobtype=%s) on available slot", ctr.ID, jobtype) + docker.Start(ctr.ID) + } + } + } +}