diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index 4f9b00c8..0cab1ccc 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -180,14 +180,24 @@ func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog. logger.Info(). Str("task", task.UUID). Msg("re-queueing task") - err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued) - if err != nil { + + // Write to task activity that it got requeued because of worker sign-off. + task.Activity = "Task requeued because worked signed off" + if err := f.persist.SaveTaskActivity(ctx, task); err != nil { logger.Warn().Err(err). Str("task", task.UUID). Msg("error queueing task on worker sign-off") lastErr = err } - // TODO: write to task activity that it got requeued because of worker sign-off. + + if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil { + logger.Warn().Err(err). + Str("task", task.UUID). + Msg("error queueing task on worker sign-off") + lastErr = err + } + + f.taskLogAppend(logger, task, "Task was requeued by Manager because the worker assigned to it signed off.\n") } return lastErr @@ -397,17 +407,7 @@ func (f *Flamenco) doTaskUpdate( } if update.Log != nil { - // Errors writing the log to file should be logged in our own logging - // system, but shouldn't abort the render. As such, `err` is not returned to - // the caller. - err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log) - if err != nil { - logger.Error().Err(err).Msg("error writing task log") - } - - // Broadcast the task log to SocketIO clients. - taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, *update.Log) - f.broadcaster.BroadcastTaskLogUpdate(taskUpdate) + f.taskLogAppend(logger, dbTask, *update.Log) } // Any error updating the status is more important than an error updating the @@ -418,6 +418,21 @@ func (f *Flamenco) doTaskUpdate( return dbErrActivity } +// taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO. +func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) { + // Errors writing the log to file should be logged in our own logging + // system, but shouldn't ripple up. As such, `err` is not returned to + // the caller. + err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, logChunk) + if err != nil { + logger.Error().Err(err).Msg("error writing task log") + } + + // Broadcast the task log to SocketIO clients. + taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, logChunk) + f.broadcaster.BroadcastTaskLogUpdate(taskUpdate) +} + func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { logger := requestLogger(e) worker := requestWorkerOrPanic(e) diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index ec1c93fb..243969f1 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -126,6 +126,15 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) { mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task1, api.TaskStatusQueued) mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task2, api.TaskStatusQueued) + // Expect this re-queueing to end up in the task's log and activity. + mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task1) // TODO: test saved activity value + mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task2) // TODO: test saved activity value + logMsg := "Task was requeued by Manager because the worker assigned to it signed off.\n" + mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg) + mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg) + mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task1.UUID, Log: logMsg}) + mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task2.UUID, Log: logMsg}) + // Expect worker to be saved as 'offline'. mf.persistence.EXPECT(). SaveWorkerStatus(expectCtx, &worker).