Manager: include 'activity' in SocketIO task updates

This also changes the order in which the task is updated; the activity is
now saved first, so that it can be included in the task status change
notification sent to SocketIO clients.
This commit is contained in:
Sybren A. Stüvel 2022-05-19 14:27:42 +02:00
parent 797dea85ed
commit 3c622264a4
3 changed files with 23 additions and 16 deletions

View File

@ -373,7 +373,14 @@ func (f *Flamenco) doTaskUpdate(
logger.Panic().Msg("dbTask.Job is nil, unable to continue") logger.Panic().Msg("dbTask.Job is nil, unable to continue")
} }
var dbErr error var dbErrActivity, dbErrStatus error
if update.Activity != nil {
dbTask.Activity = *update.Activity
// The state machine will also save the task, including its activity, but
// relying on that here would create strong cohesion.
dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask)
}
if update.TaskStatus != nil { if update.TaskStatus != nil {
oldTaskStatus := dbTask.Status oldTaskStatus := dbTask.Status
@ -383,16 +390,11 @@ func (f *Flamenco) doTaskUpdate(
Str("newTaskStatus", string(*update.TaskStatus)). Str("newTaskStatus", string(*update.TaskStatus)).
Str("oldTaskStatus", string(oldTaskStatus)). Str("oldTaskStatus", string(oldTaskStatus)).
Msg("error changing task status") Msg("error changing task status")
dbErr = fmt.Errorf("changing status of task %s to %q: %w", dbErrStatus = fmt.Errorf("changing status of task %s to %q: %w",
dbTask.UUID, *update.TaskStatus, err) dbTask.UUID, *update.TaskStatus, err)
} }
} }
if update.Activity != nil {
dbTask.Activity = *update.Activity
dbErr = f.persist.SaveTaskActivity(ctx, dbTask)
}
if update.Log != nil { if update.Log != nil {
// Errors writing the log to file should be logged in our own logging // Errors writing the log to file should be logged in our own logging
// system, but shouldn't abort the render. As such, `err` is not returned to // system, but shouldn't abort the render. As such, `err` is not returned to
@ -403,7 +405,12 @@ func (f *Flamenco) doTaskUpdate(
} }
} }
return dbErr // Any error updating the status is more important than an error updating the
// activity.
if dbErrStatus != nil {
return dbErrStatus
}
return dbErrActivity
} }
func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error {

View File

@ -198,8 +198,8 @@ func TestTaskUpdate(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) assert.Equal(t, mockTask.UUID, statusChangedtask.UUID)
assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID)
assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status. assert.Equal(t, "testing", statusChangedtask.Activity)
assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately. assert.Equal(t, "testing", actUpdatedTask.Activity)
} }
func TestMayWorkerRun(t *testing.T) { func TestMayWorkerRun(t *testing.T) {

View File

@ -32,14 +32,14 @@ func NewJobUpdate(job *persistence.Job) api.JobUpdate {
// Assumes task.Job is not nil. // Assumes task.Job is not nil.
func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate { func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate {
taskUpdate := api.SocketIOTaskUpdate{ taskUpdate := api.SocketIOTaskUpdate{
Id: task.UUID, Id: task.UUID,
JobId: task.Job.UUID, JobId: task.Job.UUID,
Name: task.Name, Name: task.Name,
Updated: task.UpdatedAt, Updated: task.UpdatedAt,
Status: task.Status, Status: task.Status,
Activity: task.Activity,
} }
return taskUpdate return taskUpdate
} }
// BroadcastJobUpdate sends the job update to clients. // BroadcastJobUpdate sends the job update to clients.