Manager: on worker signoff, add a note to any requeued task logs

When a worker signs off, its tasks get requeued. This is now also saved in
the task log, and broadcast via SocketIO as task log chunk.
This commit is contained in:
Sybren A. Stüvel 2022-05-20 14:17:17 +02:00
parent 64e9f7cbbe
commit 792b4ab141
2 changed files with 38 additions and 14 deletions

View File

@ -180,14 +180,24 @@ func (f *Flamenco) workerRequeueActiveTasks(ctx context.Context, logger zerolog.
logger.Info().
Str("task", task.UUID).
Msg("re-queueing task")
err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued)
if err != nil {
// Write to task activity that it got requeued because of worker sign-off.
task.Activity = "Task requeued because worked signed off"
if err := f.persist.SaveTaskActivity(ctx, task); err != nil {
logger.Warn().Err(err).
Str("task", task.UUID).
Msg("error queueing task on worker sign-off")
lastErr = err
}
// TODO: write to task activity that it got requeued because of worker sign-off.
if err := f.stateMachine.TaskStatusChange(ctx, task, api.TaskStatusQueued); err != nil {
logger.Warn().Err(err).
Str("task", task.UUID).
Msg("error queueing task on worker sign-off")
lastErr = err
}
f.taskLogAppend(logger, task, "Task was requeued by Manager because the worker assigned to it signed off.\n")
}
return lastErr
@ -397,17 +407,7 @@ func (f *Flamenco) doTaskUpdate(
}
if update.Log != nil {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't abort the render. As such, `err` is not returned to
// the caller.
err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, *update.Log)
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, *update.Log)
f.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
f.taskLogAppend(logger, dbTask, *update.Log)
}
// Any error updating the status is more important than an error updating the
@ -418,6 +418,21 @@ func (f *Flamenco) doTaskUpdate(
return dbErrActivity
}
// taskLogAppend appends a chunk of log lines to the task's log, and broadcasts it over SocketIO.
func (f *Flamenco) taskLogAppend(logger zerolog.Logger, dbTask *persistence.Task, logChunk string) {
// Errors writing the log to file should be logged in our own logging
// system, but shouldn't ripple up. As such, `err` is not returned to
// the caller.
err := f.logStorage.Write(logger, dbTask.Job.UUID, dbTask.UUID, logChunk)
if err != nil {
logger.Error().Err(err).Msg("error writing task log")
}
// Broadcast the task log to SocketIO clients.
taskUpdate := webupdates.NewTaskLogUpdate(dbTask.UUID, logChunk)
f.broadcaster.BroadcastTaskLogUpdate(taskUpdate)
}
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {
logger := requestLogger(e)
worker := requestWorkerOrPanic(e)

View File

@ -126,6 +126,15 @@ func TestWorkerSignoffTaskRequeue(t *testing.T) {
mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task1, api.TaskStatusQueued)
mf.stateMachine.EXPECT().TaskStatusChange(expectCtx, &task2, api.TaskStatusQueued)
// Expect this re-queueing to end up in the task's log and activity.
mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task1) // TODO: test saved activity value
mf.persistence.EXPECT().SaveTaskActivity(expectCtx, &task2) // TODO: test saved activity value
logMsg := "Task was requeued by Manager because the worker assigned to it signed off.\n"
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task1.UUID, logMsg)
mf.logStorage.EXPECT().Write(gomock.Any(), job.UUID, task2.UUID, logMsg)
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task1.UUID, Log: logMsg})
mf.broadcaster.EXPECT().BroadcastTaskLogUpdate(api.SocketIOTaskLogUpdate{TaskId: task2.UUID, Log: logMsg})
// Expect worker to be saved as 'offline'.
mf.persistence.EXPECT().
SaveWorkerStatus(expectCtx, &worker).