diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go
index c8344a34..aea70e13 100644
--- a/cmd/flamenco-worker-poc/main.go
+++ b/cmd/flamenco-worker-poc/main.go
@@ -67,8 +67,8 @@ func main() {
shutdownComplete = make(chan struct{})
- taskRunner := struct{}{}
- w = worker.NewWorker(client, taskRunner)
+ taskRunner := worker.TaskExecutor{}
+ w = worker.NewWorker(client, &taskRunner)
// Handle Ctrl+C
c := make(chan os.Signal, 1)
diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go
index 20a197f5..380c1240 100644
--- a/internal/manager/api_impl/api_impl.go
+++ b/internal/manager/api_impl/api_impl.go
@@ -42,6 +42,7 @@ type PersistenceService interface {
CreateWorker(ctx context.Context, w *persistence.Worker) error
FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error)
+ SaveWorker(ctx context.Context, w *persistence.Worker) error
}
type JobCompiler interface {
diff --git a/internal/manager/api_impl/worker_auth.go b/internal/manager/api_impl/worker_auth.go
index 2027aeb3..4f3c9c2f 100644
--- a/internal/manager/api_impl/worker_auth.go
+++ b/internal/manager/api_impl/worker_auth.go
@@ -79,7 +79,7 @@ func WorkerAuth(ctx context.Context, authInfo *openapi3filter.AuthenticationInpu
return nil
}
-// requestWorker returns the Worker associated with this HTTP request.
+// requestWorker returns the Worker associated with this HTTP request, or nil if there is none.
func requestWorker(e echo.Context) *persistence.Worker {
ctx := e.Request().Context()
worker, ok := ctx.Value(workerKey).(*persistence.Worker)
@@ -88,3 +88,13 @@ func requestWorker(e echo.Context) *persistence.Worker {
}
return nil
}
+
+// requestWorkerOrPanic returns the Worker associated with this HTTP request, or panics if there is none.
+func requestWorkerOrPanic(e echo.Context) *persistence.Worker {
+ w := requestWorker(e)
+ if w == nil {
+ logger := requestLogger(e)
+ logger.Panic().Msg("no worker available where one was expected")
+ }
+ return w
+}
diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go
index fdb8bd18..a866145d 100644
--- a/internal/manager/api_impl/workers.go
+++ b/internal/manager/api_impl/workers.go
@@ -90,10 +90,23 @@ func (f *Flamenco) SignOn(e echo.Context) error {
logger.Info().Msg("worker signing on")
- return e.JSON(http.StatusOK, &api.WorkerStateChange{
- // TODO: look up proper status in DB.
- StatusRequested: api.WorkerStatusAwake,
- })
+ w := requestWorkerOrPanic(e)
+ w.Status = api.WorkerStatusStarting
+ err = f.persist.SaveWorker(e.Request().Context(), w)
+ if err != nil {
+ logger.Warn().Err(err).
+ Str("newStatus", string(w.Status)).
+ Msg("error storing Worker in database")
+ return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
+ }
+
+ resp := api.WorkerStateChange{}
+ if w.StatusRequested != "" {
+ resp.StatusRequested = w.StatusRequested
+ } else {
+ resp.StatusRequested = api.WorkerStatusAwake
+ }
+ return e.JSON(http.StatusOK, resp)
}
func (f *Flamenco) SignOff(e echo.Context) error {
@@ -107,8 +120,20 @@ func (f *Flamenco) SignOff(e echo.Context) error {
}
logger.Info().Msg("worker signing off")
+ w := requestWorkerOrPanic(e)
+ w.Status = api.WorkerStatusOffline
+ // TODO: check whether we should pass the request context here, or a generic
+ // background context, as this should be stored even when the HTTP connection
+ // is aborted.
+ err = f.persist.SaveWorker(e.Request().Context(), w)
+ if err != nil {
+ logger.Warn().
+ Err(err).
+ Str("newStatus", string(w.Status)).
+ Msg("error storing worker status in database")
+ return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database")
+ }
- // TODO: store status in DB.
return e.String(http.StatusNoContent, "")
}
@@ -131,6 +156,17 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error {
}
logger.Info().Str("newStatus", string(req.Status)).Msg("worker changed status")
+
+ w := requestWorkerOrPanic(e)
+ w.Status = req.Status
+ err = f.persist.SaveWorker(e.Request().Context(), w)
+ if err != nil {
+ logger.Warn().Err(err).
+ Str("newStatus", string(w.Status)).
+ Msg("error storing Worker in database")
+ return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database")
+ }
+
return e.String(http.StatusNoContent, "")
}
diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go
index cb7d1361..5c80d605 100644
--- a/internal/manager/persistence/workers.go
+++ b/internal/manager/persistence/workers.go
@@ -34,11 +34,12 @@ type Worker struct {
Secret string `gorm:"type:varchar(255);not null"`
Name string `gorm:"type:varchar(64);not null"`
- Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address.
- LastActivity string `gorm:"type:varchar(255);not null"`
- Platform string `gorm:"type:varchar(16);not null"`
- Software string `gorm:"type:varchar(32);not null"`
- Status api.WorkerStatus `gorm:"type:varchar(16);not null"`
+ Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address.
+ LastActivity string `gorm:"type:varchar(255);not null"`
+ Platform string `gorm:"type:varchar(16);not null"`
+ Software string `gorm:"type:varchar(32);not null"`
+ Status api.WorkerStatus `gorm:"type:varchar(16);not null"`
+ StatusRequested api.WorkerStatus `gorm:"type:varchar(16);not null;default:''"`
SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types.
}
@@ -58,3 +59,10 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) {
}
return &w, nil
}
+
+func (db *DB) SaveWorker(ctx context.Context, w *Worker) error {
+ if err := db.gormDB.Save(w).Error; err != nil {
+ return fmt.Errorf("error saving worker: %v", err)
+ }
+ return nil
+}
diff --git a/internal/worker/state_asleep.go b/internal/worker/state_asleep.go
index 23584b61..ee1c2a83 100644
--- a/internal/worker/state_asleep.go
+++ b/internal/worker/state_asleep.go
@@ -1,5 +1,25 @@
package worker
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
import (
"context"
"net/http"
diff --git a/internal/worker/state_awake.go b/internal/worker/state_awake.go
index fc6cc063..59e2593e 100644
--- a/internal/worker/state_awake.go
+++ b/internal/worker/state_awake.go
@@ -1,5 +1,25 @@
package worker
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
import (
"context"
"errors"
@@ -34,13 +54,20 @@ func (w *Worker) gotoStateAwake(ctx context.Context) {
func (w *Worker) runStateAwake(ctx context.Context) {
defer w.doneWg.Done()
- task := w.fetchTask(ctx)
- if task == nil {
- return
- }
- // TODO: actually execute the task
- log.Error().Interface("task", *task).Msg("task execution not implemented yet")
+ for {
+ task := w.fetchTask(ctx)
+ if task == nil {
+ return
+ }
+
+ err := w.taskRunner.Run(ctx, *task)
+ if err != nil {
+ log.Warn().Err(err).Interface("task", *task).Msg("error executing task")
+ }
+
+ // TODO: send the result of the execution back to the Manager.
+ }
}
// fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained.
@@ -70,6 +97,7 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask {
resp, err := w.client.ScheduleTaskWithResponse(ctx)
if err != nil {
log.Error().Err(err).Msg("error obtaining task")
+ return nil
}
switch {
case resp.JSON200 != nil:
diff --git a/internal/worker/state_shutdown.go b/internal/worker/state_shutdown.go
index 5bbb8343..10064a83 100644
--- a/internal/worker/state_shutdown.go
+++ b/internal/worker/state_shutdown.go
@@ -1,5 +1,25 @@
package worker
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
import (
"context"
"os"
diff --git a/internal/worker/statemachine.go b/internal/worker/statemachine.go
index db34cdef..b3bf9b83 100644
--- a/internal/worker/statemachine.go
+++ b/internal/worker/statemachine.go
@@ -1,5 +1,25 @@
package worker
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
import (
"context"
diff --git a/internal/worker/task_executor.go b/internal/worker/task_executor.go
new file mode 100644
index 00000000..06b6eb19
--- /dev/null
+++ b/internal/worker/task_executor.go
@@ -0,0 +1,52 @@
+package worker
+
+/* ***** BEGIN GPL LICENSE BLOCK *****
+ *
+ * Original Code Copyright (C) 2022 Blender Foundation.
+ *
+ * This file is part of Flamenco.
+ *
+ * Flamenco is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later
+ * version.
+ *
+ * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * Flamenco. If not, see .
+ *
+ * ***** END GPL LICENSE BLOCK ***** */
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/rs/zerolog/log"
+ "gitlab.com/blender/flamenco-ng-poc/pkg/api"
+)
+
+type TaskExecutor struct{}
+
+var _ TaskRunner = (*TaskExecutor)(nil)
+
+func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error {
+ logger := log.With().Str("task", task.Uuid).Logger()
+ logger.Info().Str("taskType", task.TaskType).Msg("starting task")
+
+ for _, cmd := range task.Commands {
+ cmdLogger := logger.With().Str("command", cmd.Name).Interface("settings", cmd.Settings).Logger()
+ cmdLogger.Info().Msg("running command")
+
+ select {
+ case <-ctx.Done():
+ cmdLogger.Warn().Msg("command execution aborted due to context shutdown")
+ case <-time.After(1 * time.Second):
+ cmdLogger.Debug().Msg("mocked duration of command")
+ }
+ }
+ return errors.New("task running not implemented")
+}
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index 1d0fc8e5..99ad6dde 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -29,7 +29,9 @@ type Worker struct {
type StateStarter func(context.Context)
-type TaskRunner interface{}
+type TaskRunner interface {
+ Run(ctx context.Context, task api.AssignedTask) error
+}
// NewWorker constructs and returns a new Worker.
func NewWorker(