diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go index aea70e13..c54df055 100644 --- a/cmd/flamenco-worker-poc/main.go +++ b/cmd/flamenco-worker-poc/main.go @@ -39,6 +39,7 @@ import ( var ( w *worker.Worker + listener *worker.Listener shutdownComplete chan struct{} ) @@ -66,9 +67,12 @@ func main() { sctxCancelFunc() shutdownComplete = make(chan struct{}) + workerCtx, workerCtxCancel := context.WithCancel(context.Background()) - taskRunner := worker.TaskExecutor{} - w = worker.NewWorker(client, &taskRunner) + listener = worker.NewListener(client) + cmdRunner := worker.NewCommandExecutor(listener) + taskRunner := worker.NewTaskExecutor(cmdRunner, listener) + w = worker.NewWorker(client, taskRunner) // Handle Ctrl+C c := make(chan os.Signal, 1) @@ -76,13 +80,14 @@ func main() { signal.Notify(c, syscall.SIGTERM) go func() { for signum := range c { + workerCtxCancel() // Run the shutdown sequence in a goroutine, so that multiple Ctrl+C presses can be handled in parallel. go shutdown(signum) } }() - workerCtx := context.Background() - w.Start(workerCtx, startupState) + go listener.Run(workerCtx) + go w.Start(workerCtx, startupState) <-shutdownComplete @@ -99,6 +104,7 @@ func shutdown(signum os.Signal) { defer cancelFunc() w.SignOff(shutdownCtx) w.Close() + listener.Wait() } close(done) }() diff --git a/internal/worker/command_executor.go b/internal/worker/command_executor.go new file mode 100644 index 00000000..6f7b2d96 --- /dev/null +++ b/internal/worker/command_executor.go @@ -0,0 +1,55 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "errors" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +type CommandListener interface { + // LogProduced sends any logging to whatever service for storing logging. + LogProduced(taskID TaskID, logLines []string) error + // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). + OutputProduced(taskID TaskID, outputLocation string) error +} + +type CommandExecutor struct { + listener CommandListener +} + +var _ CommandRunner = (*CommandExecutor)(nil) + +func NewCommandExecutor(listener CommandListener) *CommandExecutor { + return &CommandExecutor{ + listener: listener, + } +} + +func (te *CommandExecutor) Run(ctx context.Context, taskID TaskID, cmd api.Command) error { + logger := log.With().Str("task", string(taskID)).Str("command", cmd.Name).Logger() + logger.Info().Interface("settings", cmd.Settings).Msg("running command") + + return errors.New("command running not implemented") +} diff --git a/internal/worker/listener.go b/internal/worker/listener.go new file mode 100644 index 00000000..a2f3455b --- /dev/null +++ b/internal/worker/listener.go @@ -0,0 +1,97 @@ +package worker + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/rs/zerolog/log" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +var _ CommandListener = (*Listener)(nil) +var _ TaskExecutionListener = (*Listener)(nil) + +// Listener listens to the result of task and command execution, and sends it to the Manager. +type Listener struct { + doneWg *sync.WaitGroup + client api.ClientWithResponsesInterface +} + +// NewListener creates a new Listener that will send updates to the API client. +func NewListener(client api.ClientWithResponsesInterface) *Listener { + l := &Listener{ + doneWg: new(sync.WaitGroup), + client: client, + } + l.doneWg.Add(1) + return l +} + +func (l *Listener) Run(ctx context.Context) { + keepRunning := true + for keepRunning { + select { + case <-ctx.Done(): + keepRunning = false + continue + case <-time.After(10 * time.Second): + // This is just a dummy thing. + } + log.Debug().Msg("listener is still running") + } + + log.Debug().Msg("listener shutting down") + l.doneWg.Done() +} + +func (l *Listener) Wait() { + log.Debug().Msg("waiting for listener to shut down") + l.doneWg.Wait() +} + +// TaskStarted tells the Manager that task execution has started. +func (l *Listener) TaskStarted(taskID TaskID) error { + return errors.New("not implemented") +} + +// TaskFailed tells the Manager the task failed for some reason. +func (l *Listener) TaskFailed(taskID TaskID, reason string) error { + return errors.New("not implemented") +} + +// TaskCompleted tells the Manager the task has been completed. +func (l *Listener) TaskCompleted(taskID TaskID) error { + return errors.New("not implemented") +} + +// LogProduced sends any logging to whatever service for storing logging. +func (l *Listener) LogProduced(taskID TaskID, logLines []string) error { + return errors.New("not implemented") +} + +// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). +func (l *Listener) OutputProduced(taskID TaskID, outputLocation string) error { + return errors.New("not implemented") +} diff --git a/internal/worker/task_executor.go b/internal/worker/task_executor.go index 06b6eb19..c91c8777 100644 --- a/internal/worker/task_executor.go +++ b/internal/worker/task_executor.go @@ -22,31 +22,77 @@ package worker import ( "context" - "errors" - "time" + "fmt" "github.com/rs/zerolog/log" "gitlab.com/blender/flamenco-ng-poc/pkg/api" ) -type TaskExecutor struct{} +type CommandRunner interface { + Run(ctx context.Context, taskID TaskID, cmd api.Command) error +} + +type TaskExecutionListener interface { + // TaskStarted tells the Manager that task execution has started. + TaskStarted(taskID TaskID) error + + // TaskFailed tells the Manager the task failed for some reason. + TaskFailed(taskID TaskID, reason string) error + + // TaskCompleted tells the Manager the task has been completed. + TaskCompleted(taskID TaskID) error +} + +// TODO: move me to a more appropriate place. +type TaskID string + +type TaskExecutor struct { + cmdRunner CommandRunner + listener TaskExecutionListener +} var _ TaskRunner = (*TaskExecutor)(nil) +func NewTaskExecutor(cmdRunner CommandRunner, listener TaskExecutionListener) *TaskExecutor { + return &TaskExecutor{ + cmdRunner: cmdRunner, + listener: listener, + } +} + func (te *TaskExecutor) Run(ctx context.Context, task api.AssignedTask) error { logger := log.With().Str("task", task.Uuid).Logger() logger.Info().Str("taskType", task.TaskType).Msg("starting task") - for _, cmd := range task.Commands { - cmdLogger := logger.With().Str("command", cmd.Name).Interface("settings", cmd.Settings).Logger() - cmdLogger.Info().Msg("running command") + taskID := TaskID(task.Uuid) + if err := te.listener.TaskStarted(taskID); err != nil { + return fmt.Errorf("error sending notification to manager: %w", err) + } + + for _, cmd := range task.Commands { select { case <-ctx.Done(): - cmdLogger.Warn().Msg("command execution aborted due to context shutdown") - case <-time.After(1 * time.Second): - cmdLogger.Debug().Msg("mocked duration of command") + // Shutdown does not mean task failure; cleanly shutting down will hand + // back the task for requeueing on the Manager. + logger.Warn().Msg("task execution aborted due to context shutdown") + return nil + default: + } + + err := te.cmdRunner.Run(ctx, taskID, cmd) + + if err != nil { + if err := te.listener.TaskFailed(taskID, err.Error()); err != nil { + return fmt.Errorf("error sending notification to manager: %w", err) + } + return err } } - return errors.New("task running not implemented") + + if err := te.listener.TaskCompleted(taskID); err != nil { + return fmt.Errorf("error sending notification to manager: %w", err) + } + + return nil }