From 3c622264a47b21df8c2c8a810e5fc470e0b44873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Thu, 19 May 2022 14:27:42 +0200 Subject: [PATCH] Manager: include 'activity' in SocketIO task updates This also changes the order in which the task is updated; the activity is now saved first, so that it can be included in the task status change notification sent to SocketIO clients. --- internal/manager/api_impl/workers.go | 23 ++++++++++++++-------- internal/manager/api_impl/workers_test.go | 4 ++-- internal/manager/webupdates/job_updates.go | 12 +++++------ 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/internal/manager/api_impl/workers.go b/internal/manager/api_impl/workers.go index f5623db6..1fbda706 100644 --- a/internal/manager/api_impl/workers.go +++ b/internal/manager/api_impl/workers.go @@ -373,7 +373,14 @@ func (f *Flamenco) doTaskUpdate( logger.Panic().Msg("dbTask.Job is nil, unable to continue") } - var dbErr error + var dbErrActivity, dbErrStatus error + + if update.Activity != nil { + dbTask.Activity = *update.Activity + // The state machine will also save the task, including its activity, but + // relying on that here would create strong cohesion. + dbErrActivity = f.persist.SaveTaskActivity(ctx, dbTask) + } if update.TaskStatus != nil { oldTaskStatus := dbTask.Status @@ -383,16 +390,11 @@ func (f *Flamenco) doTaskUpdate( Str("newTaskStatus", string(*update.TaskStatus)). Str("oldTaskStatus", string(oldTaskStatus)). Msg("error changing task status") - dbErr = fmt.Errorf("changing status of task %s to %q: %w", + dbErrStatus = fmt.Errorf("changing status of task %s to %q: %w", dbTask.UUID, *update.TaskStatus, err) } } - if update.Activity != nil { - dbTask.Activity = *update.Activity - dbErr = f.persist.SaveTaskActivity(ctx, dbTask) - } - if update.Log != nil { // Errors writing the log to file should be logged in our own logging // system, but shouldn't abort the render. As such, `err` is not returned to @@ -403,7 +405,12 @@ func (f *Flamenco) doTaskUpdate( } } - return dbErr + // Any error updating the status is more important than an error updating the + // activity. + if dbErrStatus != nil { + return dbErrStatus + } + return dbErrActivity } func (f *Flamenco) MayWorkerRun(e echo.Context, taskID string) error { diff --git a/internal/manager/api_impl/workers_test.go b/internal/manager/api_impl/workers_test.go index 234fd28a..ddf92df6 100644 --- a/internal/manager/api_impl/workers_test.go +++ b/internal/manager/api_impl/workers_test.go @@ -198,8 +198,8 @@ func TestTaskUpdate(t *testing.T) { assert.NoError(t, err) assert.Equal(t, mockTask.UUID, statusChangedtask.UUID) assert.Equal(t, mockTask.UUID, actUpdatedTask.UUID) - assert.Equal(t, "pre-update activity", statusChangedtask.Activity) // the 'save' should come from the change in status. - assert.Equal(t, "testing", actUpdatedTask.Activity) // the activity should be saved separately. + assert.Equal(t, "testing", statusChangedtask.Activity) + assert.Equal(t, "testing", actUpdatedTask.Activity) } func TestMayWorkerRun(t *testing.T) { diff --git a/internal/manager/webupdates/job_updates.go b/internal/manager/webupdates/job_updates.go index c7dd5bd3..7a060f2d 100644 --- a/internal/manager/webupdates/job_updates.go +++ b/internal/manager/webupdates/job_updates.go @@ -32,14 +32,14 @@ func NewJobUpdate(job *persistence.Job) api.JobUpdate { // Assumes task.Job is not nil. func NewTaskUpdate(task *persistence.Task) api.SocketIOTaskUpdate { taskUpdate := api.SocketIOTaskUpdate{ - Id: task.UUID, - JobId: task.Job.UUID, - Name: task.Name, - Updated: task.UpdatedAt, - Status: task.Status, + Id: task.UUID, + JobId: task.Job.UUID, + Name: task.Name, + Updated: task.UpdatedAt, + Status: task.Status, + Activity: task.Activity, } return taskUpdate - } // BroadcastJobUpdate sends the job update to clients.