From be8934963229b7a378b2db8f6994564f9af31039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 31 Jan 2022 16:05:27 +0100 Subject: [PATCH] Very basic non-functional framework for a task runner Also has some login/logout functionality for storing stuff in the DB. --- cmd/flamenco-worker-poc/main.go | 4 +- internal/manager/api_impl/api_impl.go | 1 + internal/manager/api_impl/worker_auth.go | 12 +++++- internal/manager/api_impl/workers.go | 46 ++++++++++++++++++--- internal/manager/persistence/workers.go | 18 +++++--- internal/worker/state_asleep.go | 20 +++++++++ internal/worker/state_awake.go | 40 +++++++++++++++--- internal/worker/state_shutdown.go | 20 +++++++++ internal/worker/statemachine.go | 20 +++++++++ internal/worker/task_executor.go | 52 ++++++++++++++++++++++++ internal/worker/worker.go | 4 +- 11 files changed, 217 insertions(+), 20 deletions(-) create mode 100644 internal/worker/task_executor.go diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go index c8344a34..aea70e13 100644 --- a/cmd/flamenco-worker-poc/main.go +++ b/cmd/flamenco-worker-poc/main.go @@ -67,8 +67,8 @@ func main() { shutdownComplete = make(chan struct{}) - taskRunner := struct{}{} - w = worker.NewWorker(client, taskRunner) + taskRunner := worker.TaskExecutor{} + w = worker.NewWorker(client, &taskRunner) // Handle Ctrl+C c := make(chan os.Signal, 1) diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index 20a197f5..380c1240 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -42,6 +42,7 @@ type PersistenceService interface { CreateWorker(ctx context.Context, w *persistence.Worker) error FetchWorker(ctx context.Context, uuid string) (*persistence.Worker, error) + SaveWorker(ctx context.Context, w *persistence.Worker) error } type JobCompiler interface { diff --git a/internal/manager/api_impl/worker_auth.go b/internal/manager/api_impl/worker_auth.go index 2027aeb3..4f3c9c2f 100644 --- a/internal/manager/api_impl/worker_auth.go +++ b/internal/manager/api_impl/worker_auth.go @@ -79,7 +79,7 @@ func WorkerAuth(ctx context.Context, authInfo *openapi3filter.AuthenticationInpu return nil } -// requestWorker returns the Worker associated with this HTTP request. +// requestWorker returns the Worker associated with this HTTP request, or nil if there is none. func requestWorker(e echo.Context) *persistence.Worker { ctx := e.Request().Context() worker, ok := ctx.Value(workerKey).(*persistence.Worker) @@ -88,3 +88,13 @@ func requestWorker(e echo.Context) *persistence.Worker { } return nil } + +// requestWorkerOrPanic returns the Worker associated with this HTTP request, or panics if there is none. +func requestWorkerOrPanic(e echo.Context) *persistence.Worker { + w := requestWorker(e) + if w == nil { + logger := requestLogger(e) + logger.Panic().Msg("no worker available where one was expected") + } + return w +} diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index fdb8bd18..a866145d 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -90,10 +90,23 @@ func (f *Flamenco) SignOn(e echo.Context) error { logger.Info().Msg("worker signing on") - return e.JSON(http.StatusOK, &api.WorkerStateChange{ - // TODO: look up proper status in DB. - StatusRequested: api.WorkerStatusAwake, - }) + w := requestWorkerOrPanic(e) + w.Status = api.WorkerStatusStarting + err = f.persist.SaveWorker(e.Request().Context(), w) + if err != nil { + logger.Warn().Err(err). + Str("newStatus", string(w.Status)). + Msg("error storing Worker in database") + return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") + } + + resp := api.WorkerStateChange{} + if w.StatusRequested != "" { + resp.StatusRequested = w.StatusRequested + } else { + resp.StatusRequested = api.WorkerStatusAwake + } + return e.JSON(http.StatusOK, resp) } func (f *Flamenco) SignOff(e echo.Context) error { @@ -107,8 +120,20 @@ func (f *Flamenco) SignOff(e echo.Context) error { } logger.Info().Msg("worker signing off") + w := requestWorkerOrPanic(e) + w.Status = api.WorkerStatusOffline + // TODO: check whether we should pass the request context here, or a generic + // background context, as this should be stored even when the HTTP connection + // is aborted. + err = f.persist.SaveWorker(e.Request().Context(), w) + if err != nil { + logger.Warn(). + Err(err). + Str("newStatus", string(w.Status)). + Msg("error storing worker status in database") + return sendAPIError(e, http.StatusInternalServerError, "error storing new status in database") + } - // TODO: store status in DB. return e.String(http.StatusNoContent, "") } @@ -131,6 +156,17 @@ func (f *Flamenco) WorkerStateChanged(e echo.Context) error { } logger.Info().Str("newStatus", string(req.Status)).Msg("worker changed status") + + w := requestWorkerOrPanic(e) + w.Status = req.Status + err = f.persist.SaveWorker(e.Request().Context(), w) + if err != nil { + logger.Warn().Err(err). + Str("newStatus", string(w.Status)). + Msg("error storing Worker in database") + return sendAPIError(e, http.StatusInternalServerError, "error storing worker in database") + } + return e.String(http.StatusNoContent, "") } diff --git a/internal/manager/persistence/workers.go b/internal/manager/persistence/workers.go index cb7d1361..5c80d605 100644 --- a/internal/manager/persistence/workers.go +++ b/internal/manager/persistence/workers.go @@ -34,11 +34,12 @@ type Worker struct { Secret string `gorm:"type:varchar(255);not null"` Name string `gorm:"type:varchar(64);not null"` - Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address. - LastActivity string `gorm:"type:varchar(255);not null"` - Platform string `gorm:"type:varchar(16);not null"` - Software string `gorm:"type:varchar(32);not null"` - Status api.WorkerStatus `gorm:"type:varchar(16);not null"` + Address string `gorm:"type:varchar(39);not null;index"` // 39 = max length of IPv6 address. + LastActivity string `gorm:"type:varchar(255);not null"` + Platform string `gorm:"type:varchar(16);not null"` + Software string `gorm:"type:varchar(32);not null"` + Status api.WorkerStatus `gorm:"type:varchar(16);not null"` + StatusRequested api.WorkerStatus `gorm:"type:varchar(16);not null;default:''"` SupportedTaskTypes string `gorm:"type:varchar(255);not null"` // comma-separated list of task types. } @@ -58,3 +59,10 @@ func (db *DB) FetchWorker(ctx context.Context, uuid string) (*Worker, error) { } return &w, nil } + +func (db *DB) SaveWorker(ctx context.Context, w *Worker) error { + if err := db.gormDB.Save(w).Error; err != nil { + return fmt.Errorf("error saving worker: %v", err) + } + return nil +} diff --git a/internal/worker/state_asleep.go b/internal/worker/state_asleep.go index 23584b61..ee1c2a83 100644 --- a/internal/worker/state_asleep.go +++ b/internal/worker/state_asleep.go @@ -1,5 +1,25 @@ package worker +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + import ( "context" "net/http" diff --git a/internal/worker/state_awake.go b/internal/worker/state_awake.go index fc6cc063..59e2593e 100644 --- a/internal/worker/state_awake.go +++ b/internal/worker/state_awake.go @@ -1,5 +1,25 @@ package worker +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + import ( "context" "errors" @@ -34,13 +54,20 @@ func (w *Worker) gotoStateAwake(ctx context.Context) { func (w *Worker) runStateAwake(ctx context.Context) { defer w.doneWg.Done() - task := w.fetchTask(ctx) - if task == nil { - return - } - // TODO: actually execute the task - log.Error().Interface("task", *task).Msg("task execution not implemented yet") + for { + task := w.fetchTask(ctx) + if task == nil { + return + } + + err := w.taskRunner.Run(ctx, *task) + if err != nil { + log.Warn().Err(err).Interface("task", *task).Msg("error executing task") + } + + // TODO: send the result of the execution back to the Manager. + } } // fetchTasks periodically tries to fetch a task from the Manager, returning it when obtained. @@ -70,6 +97,7 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask { resp, err := w.client.ScheduleTaskWithResponse(ctx) if err != nil { log.Error().Err(err).Msg("error obtaining task") + return nil } switch { case resp.JSON200 != nil: diff --git a/internal/worker/state_shutdown.go b/internal/worker/state_shutdown.go index 5bbb8343..10064a83 100644 --- a/internal/worker/state_shutdown.go +++ b/internal/worker/state_shutdown.go @@ -1,5 +1,25 @@ package worker +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + import ( "context" "os" diff --git a/internal/worker/statemachine.go b/internal/worker/statemachine.go index db34cdef..b3bf9b83 100644 --- a/internal/worker/statemachine.go +++ b/internal/worker/statemachine.go @@ -1,5 +1,25 @@ package worker +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + import ( "context" diff --git a/internal/worker/task_executor.go b/internal/worker/task_executor.go new file mode 100644 index 00000000..06b6eb19 --- /dev/null +++ b/internal/worker/task_executor.go @@ -0,0 +1,52 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "errors" + "time" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +type TaskExecutor struct{} + +var _ TaskRunner = (*TaskExecutor)(nil) + +func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error { + logger := log.With().Str("task", task.Uuid).Logger() + logger.Info().Str("taskType", task.TaskType).Msg("starting task") + + for _, cmd := range task.Commands { + cmdLogger := logger.With().Str("command", cmd.Name).Interface("settings", cmd.Settings).Logger() + cmdLogger.Info().Msg("running command") + + select { + case <-ctx.Done(): + cmdLogger.Warn().Msg("command execution aborted due to context shutdown") + case <-time.After(1 * time.Second): + cmdLogger.Debug().Msg("mocked duration of command") + } + } + return errors.New("task running not implemented") +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 1d0fc8e5..99ad6dde 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -29,7 +29,9 @@ type Worker struct { type StateStarter func(context.Context) -type TaskRunner interface{} +type TaskRunner interface { + Run(ctx context.Context, task api.AssignedTask) error +} // NewWorker constructs and returns a new Worker. func NewWorker(