diff --git a/cmd/flamenco-worker-poc/main.go b/cmd/flamenco-worker-poc/main.go index c54df055..024560f1 100644 --- a/cmd/flamenco-worker-poc/main.go +++ b/cmd/flamenco-worker-poc/main.go @@ -29,6 +29,7 @@ import ( "syscall" "time" + "github.com/benbjohnson/clock" "github.com/mattn/go-colorable" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -70,7 +71,8 @@ func main() { workerCtx, workerCtxCancel := context.WithCancel(context.Background()) listener = worker.NewListener(client) - cmdRunner := worker.NewCommandExecutor(listener) + timeService := clock.New() + cmdRunner := worker.NewCommandExecutor(listener, timeService) taskRunner := worker.NewTaskExecutor(cmdRunner, listener) w = worker.NewWorker(client, taskRunner) diff --git a/internal/worker/command_executor.go b/internal/worker/command_executor.go index 5de9dca0..28d4bd27 100644 --- a/internal/worker/command_executor.go +++ b/internal/worker/command_executor.go @@ -33,7 +33,7 @@ import ( type CommandListener interface { // LogProduced sends any logging to whatever service for storing logging. - LogProduced(taskID TaskID, logLines []string) error + LogProduced(taskID TaskID, logLines ...string) error // OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). OutputProduced(taskID TaskID, outputLocation string) error } @@ -42,20 +42,33 @@ type CommandExecutor struct { listener CommandListener // registry maps a command name to a function that runs that command. registry map[string]commandCallable + + timeService TimeService } var _ CommandRunner = (*CommandExecutor)(nil) type commandCallable func(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error -func NewCommandExecutor(listener CommandListener) *CommandExecutor { +// TimeService is a service that operates on time. +type TimeService interface { + After(duration time.Duration) <-chan time.Time +} + +func NewCommandExecutor(listener CommandListener, timeService TimeService) *CommandExecutor { ce := &CommandExecutor{ - listener: listener, + listener: listener, + timeService: timeService, } + + // Registry of supported commands. Having this as a map (instead of a big + // switch statement) makes it possible to do things like reporting the list of + // supported commands. ce.registry = map[string]commandCallable{ "echo": ce.cmdEcho, "sleep": ce.cmdSleep, } + return ce } @@ -71,6 +84,7 @@ func (ce *CommandExecutor) Run(ctx context.Context, taskID TaskID, cmd api.Comma return runner(ctx, logger, taskID, cmd) } +// cmdEcho executes the "echo" command. func (ce *CommandExecutor) cmdEcho(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error { message, ok := cmd.Settings["message"] if !ok { @@ -79,16 +93,13 @@ func (ce *CommandExecutor) cmdEcho(ctx context.Context, logger zerolog.Logger, t messageStr := fmt.Sprintf("%v", message) logger.Info().Str("message", messageStr).Msg("echo") - logLines := []string{ - fmt.Sprintf("echo: %q", messageStr), - } - - if err := ce.listener.LogProduced(taskID, logLines); err != nil { + if err := ce.listener.LogProduced(taskID, fmt.Sprintf("echo: %q", messageStr)); err != nil { return err } return nil } +// cmdSleep executes the "sleep" command. func (ce *CommandExecutor) cmdSleep(ctx context.Context, logger zerolog.Logger, taskID TaskID, cmd api.Command) error { sleepTime, ok := cmd.Settings["time_in_seconds"] @@ -106,7 +117,19 @@ func (ce *CommandExecutor) cmdSleep(ctx context.Context, logger zerolog.Logger, } log.Info().Str("duration", duration.String()).Msg("sleep") - time.Sleep(duration) + + select { + case <-ctx.Done(): + err := ctx.Err() + log.Warn().Err(err).Msg("sleep aborted because context closed") + return fmt.Errorf("sleep aborted because context closed: %w", err) + case <-ce.timeService.After(duration): + log.Debug().Msg("sleeping done") + } + + if err := ce.listener.LogProduced(taskID, fmt.Sprintf("slept %v", duration)); err != nil { + return err + } return nil } diff --git a/internal/worker/command_executor_test.go b/internal/worker/command_executor_test.go new file mode 100644 index 00000000..9af3ae6b --- /dev/null +++ b/internal/worker/command_executor_test.go @@ -0,0 +1,132 @@ +package worker + +/* ***** BEGIN GPL LICENSE BLOCK ***** + * + * Original Code Copyright (C) 2022 Blender Foundation. + * + * This file is part of Flamenco. + * + * Flamenco is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation, either version 3 of the License, or (at your option) any later + * version. + * + * Flamenco is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + * A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * Flamenco. If not, see . + * + * ***** END GPL LICENSE BLOCK ***** */ + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/stretchr/testify/assert" + "gitlab.com/blender/flamenco-ng-poc/pkg/api" +) + +type mockCommandListener struct { + log []loggedLines + output []producedOutput +} +type loggedLines struct { + taskID TaskID + logLines []string +} +type producedOutput struct { + taskID TaskID + outputLocation string +} + +// LogProduced sends any logging to whatever service for storing logging. +func (ml *mockCommandListener) LogProduced(taskID TaskID, logLines ...string) error { + ml.log = append(ml.log, loggedLines{taskID, logLines}) + return nil +} + +// OutputProduced tells the Manager there has been some output (most commonly a rendered frame or video). +func (ml *mockCommandListener) OutputProduced(taskID TaskID, outputLocation string) error { + ml.output = append(ml.output, producedOutput{taskID, outputLocation}) + return nil +} + +func mockedClock(t *testing.T) *clock.Mock { + c := clock.NewMock() + now, err := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00") + assert.NoError(t, err) + c.Set(now) + return c +} + +func TestCommandEcho(t *testing.T) { + l := mockCommandListener{} + clock := mockedClock(t) + ce := NewCommandExecutor(&l, clock) + + ctx := context.Background() + message := "понављај за мном" + taskID := TaskID("90e9d656-e201-4ef0-b6b0-c80684fafa27") + cmd := api.Command{ + Name: "echo", + Settings: map[string]interface{}{"message": message}, + } + + err := ce.Run(ctx, taskID, cmd) + assert.NoError(t, err) + + assert.Len(t, l.log, 1) + assert.Equal(t, taskID, l.log[0].taskID) + assert.Equal(t, "echo: \"понављај за мном\"", l.log[0].logLines[0]) + assert.Len(t, l.output, 0) +} + +func TestCommandSleep(t *testing.T) { + l := mockCommandListener{} + clock := mockedClock(t) + ce := NewCommandExecutor(&l, clock) + + ctx := context.Background() + taskID := TaskID("90e9d656-e201-4ef0-b6b0-c80684fafa27") + cmd := api.Command{ + Name: "sleep", + Settings: map[string]interface{}{"time_in_seconds": 47}, + } + + timeBefore := clock.Now() + + // Run the test in a goroutine, as we also need to actually increase the + // mocked clock at the same time; without that, the command will sleep + // indefinitely. + runDone := make(chan struct{}) + var err error + go func() { + err = ce.Run(ctx, taskID, cmd) + close(runDone) + }() + + timeStepSize := 1 * time.Second +loop: + for { + select { + case <-runDone: + break loop + default: + clock.Add(timeStepSize) + } + } + + assert.NoError(t, err) + timeAfter := clock.Now() + // Within the step size is precise enough. We're testing our implementation, not the precision of `time.After()`. + assert.WithinDuration(t, timeBefore.Add(47*time.Second), timeAfter, timeStepSize) + + assert.Len(t, l.log, 1) + assert.Equal(t, taskID, l.log[0].taskID) + assert.Equal(t, "slept 47s", l.log[0].logLines[0]) + assert.Len(t, l.output, 0) +} diff --git a/internal/worker/listener.go b/internal/worker/listener.go index a2f3455b..e99b80ae 100644 --- a/internal/worker/listener.go +++ b/internal/worker/listener.go @@ -87,7 +87,7 @@ func (l *Listener) TaskCompleted(taskID TaskID) error { } // LogProduced sends any logging to whatever service for storing logging. -func (l *Listener) LogProduced(taskID TaskID, logLines []string) error { +func (l *Listener) LogProduced(taskID TaskID, logLines ...string) error { return errors.New("not implemented") }