diff --git a/debug-job-echo.sh b/debug-job-echo.sh index c1ef866b..80dfc3b8 100755 --- a/debug-job-echo.sh +++ b/debug-job-echo.sh @@ -7,13 +7,14 @@ curl -X 'POST' \ -d '{ "metadata": { "project": "Debugging Flamenco", - "user.name": "コードモンキー" + "user.name": "dr. Sybren", + "duration": "long" }, - "name": "Talk & Sleep", - "priority": 50, + "name": "Talk & Sleep longer", + "priority": 3, "settings": { - "sleep_duration_seconds": 2, - "message": "{blender}" + "sleep_duration_seconds": 20, + "message": "Blender is {blender}" }, "type": "echo-sleep-test" }' diff --git a/internal/manager/api_impl/api_impl.go b/internal/manager/api_impl/api_impl.go index fba6ca71..3d6f72d9 100644 --- a/internal/manager/api_impl/api_impl.go +++ b/internal/manager/api_impl/api_impl.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "strconv" + "sync" "time" "git.blender.org/flamenco/internal/manager/job_compilers" @@ -29,6 +30,11 @@ type Flamenco struct { config ConfigService stateMachine TaskStateMachine shaman Shaman + + // The task scheduler can be locked to prevent multiple Workers from getting + // the same task. It is also used for certain other queries, like + // `MayWorkerRun` to prevent similar race conditions. + taskSchedulerMutex sync.Mutex } var _ api.ServerInterface = (*Flamenco)(nil) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 6ef3925c..340390d8 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -4,6 +4,7 @@ package api_impl import ( "context" + "errors" "fmt" "net/http" "strings" @@ -15,6 +16,7 @@ import ( "golang.org/x/crypto/bcrypt" "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/task_state_machine" "git.blender.org/flamenco/pkg/api" ) @@ -249,6 +251,9 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { worker := requestWorkerOrPanic(e) logger.Debug().Msg("worker requesting task") + f.taskSchedulerMutex.Lock() + defer f.taskSchedulerMutex.Unlock() + // Check that this worker is actually allowed to do work. requiredStatusToGetTask := api.WorkerStatusAwake if worker.Status != api.WorkerStatusAwake { @@ -307,3 +312,58 @@ func (f *Flamenco) ScheduleTask(e echo.Context) error { customisedTask := replaceTaskVariables(f.config, apiTask, *worker) return e.JSON(http.StatusOK, customisedTask) } + +func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { + logger := requestLogger(e) + worker := requestWorkerOrPanic(e) + + if _, err := uuid.Parse(taskID); err != nil { + logger.Debug().Msg("invalid task ID received") + return sendAPIError(e, http.StatusBadRequest, "task ID not valid") + } + logger = logger.With().Str("task", taskID).Logger() + + // Lock the task scheduler so that tasks don't get reassigned while we perform our checks. + f.taskSchedulerMutex.Lock() + defer f.taskSchedulerMutex.Unlock() + + // Fetch the task, to see if this worker is allowed to run it. + ctx := e.Request().Context() + dbTask, err := f.persist.FetchTask(ctx, taskID) + if err != nil { + if errors.Is(err, persistence.ErrTaskNotFound) { + mkr := api.MayKeepRunning{Reason: "Task not found"} + return e.JSON(http.StatusOK, mkr) + } + logger.Error().Err(err).Msg("MayWorkerRun: cannot fetch task") + return sendAPIError(e, http.StatusInternalServerError, "error fetching task") + } + if dbTask == nil { + panic("task could not be fetched, but database gave no error either") + } + + mkr := mayWorkerRun(worker, dbTask) + + if mkr.MayKeepRunning { + // TODO: record that this worker "touched" this task, for timeout calculations. + } + + return e.JSON(http.StatusOK, mkr) +} + +// mayWorkerRun checks the worker and the task, to see if this worker may keep running this task. +func mayWorkerRun(worker *persistence.Worker, dbTask *persistence.Task) api.MayKeepRunning { + if worker.StatusRequested != "" { + return api.MayKeepRunning{ + Reason: "worker status change requested", + StatusChangeRequested: true, + } + } + if dbTask.WorkerID == nil || *dbTask.WorkerID != worker.ID { + return api.MayKeepRunning{Reason: "task not assigned to this worker"} + } + if !task_state_machine.IsRunnableTaskStatus(dbTask.Status) { + return api.MayKeepRunning{Reason: fmt.Sprintf("task is in non-runnable status %q", dbTask.Status)} + } + return api.MayKeepRunning{MayKeepRunning: true} +} diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index b512e92d..7d2edba0 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/labstack/echo/v4" "github.com/stretchr/testify/assert" "git.blender.org/flamenco/internal/manager/persistence" @@ -139,3 +140,80 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { resp := getRecordedResponse(echo) assert.Equal(t, http.StatusNoContent, resp.StatusCode) } + +func TestMayWorkerRun(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + worker := testWorker() + + prepareRequest := func() echo.Context { + echo := mf.prepareMockedRequest(nil) + requestWorkerStore(echo, &worker) + return echo + } + + job := persistence.Job{ + UUID: "583a7d59-887a-4c6c-b3e4-a753018f71b0", + } + + task := persistence.Task{ + UUID: "4107c7aa-e86d-4244-858b-6c4fce2af503", + Job: &job, + Status: api.TaskStatusActive, + } + + mf.persistence.EXPECT().FetchTask(gomock.Any(), task.UUID).Return(&task, nil).AnyTimes() + + // Test: unhappy, task unassigned + { + echo := prepareRequest() + err := mf.flamenco.MayWorkerRun(echo, task.UUID) + assert.NoError(t, err) + assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ + MayKeepRunning: false, + Reason: "task not assigned to this worker", + }) + } + + // Test: happy, task assigned to this worker. + { + echo := prepareRequest() + task.WorkerID = &worker.ID + err := mf.flamenco.MayWorkerRun(echo, task.UUID) + assert.NoError(t, err) + assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ + MayKeepRunning: true, + }) + } + + // Test: unhappy, assigned but cancelled. + { + echo := prepareRequest() + task.WorkerID = &worker.ID + task.Status = api.TaskStatusCanceled + err := mf.flamenco.MayWorkerRun(echo, task.UUID) + assert.NoError(t, err) + assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ + MayKeepRunning: false, + Reason: "task is in non-runnable status \"canceled\"", + }) + } + + // Test: unhappy, assigned and runnable but worker should go to bed. + { + worker.StatusRequested = api.WorkerStatusAsleep + echo := prepareRequest() + task.WorkerID = &worker.ID + task.Status = api.TaskStatusActive + err := mf.flamenco.MayWorkerRun(echo, task.UUID) + assert.NoError(t, err) + assertResponseJSON(t, echo, http.StatusOK, api.MayKeepRunning{ + MayKeepRunning: false, + Reason: "worker status change requested", + StatusChangeRequested: true, + }) + } + +} diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index d985b180..69f23753 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -54,16 +54,6 @@ type ChangeBroadcaster interface { // ChangeBroadcaster should be a subset of webupdates.BiDirComms var _ ChangeBroadcaster = (*webupdates.BiDirComms)(nil) -var ( - // Task statuses that always get requeued when the job is requeued. - nonCompletedStatuses = []api.TaskStatus{ - api.TaskStatusCanceled, - api.TaskStatusFailed, - api.TaskStatusPaused, - api.TaskStatusSoftFailed, - } -) - func NewStateMachine(persist PersistenceService, broadcaster ChangeBroadcaster) *StateMachine { return &StateMachine{ persist: persist, diff --git a/internal/manager/task_state_machine/task_statuses.go b/internal/manager/task_state_machine/task_statuses.go new file mode 100644 index 00000000..73bf2753 --- /dev/null +++ b/internal/manager/task_state_machine/task_statuses.go @@ -0,0 +1,26 @@ +package task_state_machine + +import "git.blender.org/flamenco/pkg/api" + +var ( + // Task statuses that always get requeued when the job is requeued. + nonCompletedStatuses = []api.TaskStatus{ + api.TaskStatusCanceled, + api.TaskStatusFailed, + api.TaskStatusPaused, + api.TaskStatusSoftFailed, + } + + // Workers are allowed to keep running tasks when they are in this status. + // 'queued', 'claimed-by-manager', and 'soft-failed' aren't considered runnable, + // as those statuses indicate the task wasn't assigned to a Worker by the scheduler. + runnableStatuses = map[api.TaskStatus]bool{ + api.TaskStatusActive: true, + } +) + +// IsRunnableTaskStatus returns whether the given status is considered "runnable". +// In other words, workers are allowed to keep running such tasks. +func IsRunnableTaskStatus(status api.TaskStatus) bool { + return runnableStatuses[status] +} diff --git a/internal/worker/state_asleep.go b/internal/worker/state_asleep.go index feaf0970..f569c6e4 100644 --- a/internal/worker/state_asleep.go +++ b/internal/worker/state_asleep.go @@ -35,10 +35,7 @@ func (w *Worker) runStateAsleep(ctx context.Context) { logger.Debug().Msg("asleep state interrupted by shutdown") return case <-time.After(durationSleepCheck): - newStatus := w.queryManagerForStateChange(ctx) - if newStatus != nil { - logger.Debug().Str("newStatus", string(*newStatus)).Msg("asleep state interrupted by state change") - w.changeState(ctx, *newStatus) + if w.changeStateIfRequested(ctx) { return } } diff --git a/internal/worker/state_awake.go b/internal/worker/state_awake.go index 99efdc94..0db625bc 100644 --- a/internal/worker/state_awake.go +++ b/internal/worker/state_awake.go @@ -6,6 +6,7 @@ import ( "context" "errors" "net/http" + "sync" "time" "github.com/rs/zerolog/log" @@ -18,8 +19,26 @@ const ( durationNoTask = 2 * time.Second // ... if there is no task now. durationFetchFailed = 10 * time.Second // ... if fetching failed somehow. durationTaskComplete = 2 * time.Second // ... when a task was completed. + + mayKeepRunningPeriod = 1 * time.Second ) +// Implement error interface for `api.MayKeepRunning` to indicate a task run was +// aborted due to the Manager saying "NO". +type taskRunAborted api.MayKeepRunning + +func (tra taskRunAborted) Error() string { + switch { + case tra.MayKeepRunning: + return "task could have been kept running" + case tra.StatusChangeRequested: + return "worker status change requested" + case tra.Reason == "": + return "manager said NO" + } + return tra.Reason +} + func (w *Worker) gotoStateAwake(ctx context.Context) { w.stateMutex.Lock() w.state = api.WorkerStatusAwake @@ -55,9 +74,15 @@ func (w *Worker) runStateAwake(ctx context.Context) { // The task runner's listener will be responsible for sending results back // to the Manager. This code only needs to fetch a task and run it. - err := w.taskRunner.Run(ctx, *task) + err := w.runTask(ctx, *task) if err != nil { - if errors.Is(err, context.Canceled) { + var abortError taskRunAborted + if errors.As(err, &abortError) { + log.Warn(). + Str("task", task.Uuid). + Str("reason", err.Error()). + Msg("task aborted by request of Manager") + } else if errors.Is(err, context.Canceled) { log.Warn().Interface("task", *task).Msg("task aborted due to context being closed") } else { log.Warn().Err(err).Interface("task", *task).Msg("error executing task") @@ -128,3 +153,53 @@ func (w *Worker) fetchTask(ctx context.Context) *api.AssignedTask { } } + +// runTask runs the given task. +func (w *Worker) runTask(ctx context.Context, task api.AssignedTask) error { + // Create a sub-context to manage the life-span of both the running of the + // task and the loop to check whether we're still allowed to run it. + taskCtx, taskCancel := context.WithCancel(ctx) + defer taskCancel() + + var taskRunnerErr, abortReason error + + // Run the actual task in a separate goroutine. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + defer taskCancel() + taskRunnerErr = w.taskRunner.Run(taskCtx, task) + }() + + // Do a periodic check to see if we're actually allowed to run this task. +checkloop: + for { + select { + case <-taskCtx.Done(): + // The task is done, no more need to check. + break checkloop + case <-time.After(mayKeepRunningPeriod): + // Time to do another check. + break + } + + mkr := w.mayIKeepRunning(taskCtx, task.Uuid) + if mkr.MayKeepRunning { + continue + } + + abortReason = taskRunAborted(mkr) + taskCancel() + break checkloop + } + + // Wait for the task runner to either complete or abort. + wg.Wait() + + if abortReason != nil { + return abortReason + } + + return taskRunnerErr +} diff --git a/internal/worker/statemachine.go b/internal/worker/statemachine.go index 6416c780..da4676b0 100644 --- a/internal/worker/statemachine.go +++ b/internal/worker/statemachine.go @@ -36,6 +36,24 @@ func (w *Worker) changeState(ctx context.Context, newState api.WorkerStatus) { starter(ctx) } +// changeStateIfRequested asks the Manager whether a status change is required +// or not, and if so, goes to that state. +// Returns `true` when the status was changed, so that the caller knows to stop +// whatever it's doing. +func (w *Worker) changeStateIfRequested(ctx context.Context) bool { + newStatus := w.queryManagerForStateChange(ctx) + if newStatus == nil { + return false + } + + log.Info(). + Str("currentStatus", string(w.state)). + Str("newStatus", string(*newStatus)). + Msg("Manager requested state change") + w.changeState(ctx, *newStatus) + return true +} + // Confirm that we're now in a certain state. // // This ACK can be given without a request from the server, for example to support diff --git a/internal/worker/statemonitor.go b/internal/worker/statemonitor.go index c4ae94a7..fc3632dd 100644 --- a/internal/worker/statemonitor.go +++ b/internal/worker/statemonitor.go @@ -37,3 +37,38 @@ func (w *Worker) queryManagerForStateChange(ctx context.Context) *api.WorkerStat return nil } + +// mayIKeepRunning asks the Manager whether we can keep running a certain task. +// Any error communicating with the Manager is logged but otherwise ignored. +func (w *Worker) mayIKeepRunning(ctx context.Context, taskID string) api.MayKeepRunning { + resp, err := w.client.MayWorkerRunWithResponse(ctx, taskID) + if err != nil { + log.Warn(). + Err(err). + Str("task", taskID). + Msg("error asking Manager may-I-keep-running task") + return api.MayKeepRunning{MayKeepRunning: true} + } + + switch { + case resp.JSON200 != nil: + mkr := *resp.JSON200 + logCtx := log.With(). + Str("task", taskID). + Bool("mayKeepRunning", mkr.MayKeepRunning). + Bool("statusChangeRequested", mkr.StatusChangeRequested) + if mkr.Reason != "" { + logCtx = logCtx.Str("reason", mkr.Reason) + } + logger := logCtx.Logger() + logger.Debug().Msg("may-i-keep-running response") + return mkr + default: + log.Warn(). + Str("task", taskID). + Int("code", resp.StatusCode()). + Str("error", string(resp.Body)). + Msg("unable to check may-i-keep-running for unknown reason") + return api.MayKeepRunning{MayKeepRunning: true} + } +} diff --git a/web/app/src/main.js b/web/app/src/main.js index 1cd18e55..069e0ee5 100644 --- a/web/app/src/main.js +++ b/web/app/src/main.js @@ -18,3 +18,7 @@ const pinia = createPinia() app.use(pinia) app.use(router) app.mount('#app') + +// For debugging. +import { useJobs } from '@/stores/jobs'; +window.jobs = useJobs(); diff --git a/web/app/src/views/JobsView.vue b/web/app/src/views/JobsView.vue index 1f1f778b..6b05c261 100644 --- a/web/app/src/views/JobsView.vue +++ b/web/app/src/views/JobsView.vue @@ -46,6 +46,18 @@ export default { }), mounted() { window.jobsView = this; + + this.jobs.$subscribe((mutation, state) => { + console.log("Pinia mutation:", mutation) + console.log("Pinia state :", state) + // // import { MutationType } from 'pinia' + // mutation.type // 'direct' | 'patch object' | 'patch function' + // // same as cartStore.$id + // mutation.storeId // 'cart' + // // only available with mutation.type === 'patch object' + // mutation.payload // patch object passed to cartStore.$patch() + }) + this._fetchJob(this.jobID); this._fetchTask(this.taskID); },