From b66490831c8b53227cbf97f45416cb53a4236e53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Tue, 28 May 2024 16:07:17 +0200 Subject: [PATCH] Manager: fetch jobs of tasks in FetchTasksOfWorkerInStatus() The task state machine expects that `task.Job` is set correctly. Since SQLC does not automatically fill this field (and rightfully so), I've added a bit of Go code that fetches the job in a separate query. A TODO is added as a reminder that it would be better for the task state machine itself to fetch the job when needed. --- internal/manager/persistence/jobs.go | 20 ++++++++++++++++++- .../task_state_machine/task_state_machine.go | 15 ++++++++------ .../task_state_machine_test.go | 16 ++++++++------- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 64f9e833..4c761a87 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -650,14 +650,32 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus) } + jobCache := make(map[uint]*Job) + result := make([]*Task, len(rows)) for i := range rows { - gormTask, err := convertSqlcTask(rows[i].Task, rows[i].JobUUID.String, worker.UUID) + jobUUID := rows[i].JobUUID.String + gormTask, err := convertSqlcTask(rows[i].Task, jobUUID, worker.UUID) if err != nil { return nil, err } gormTask.Worker = worker gormTask.WorkerID = &worker.ID + + // Fetch the job, either from the cache or from the database. This is done + // here because the task_state_machine functionality expects that task.Job + // is set. + // TODO: make that code fetch the job details it needs, rather than fetching + // the entire job here. + job := jobCache[gormTask.JobID] + if job == nil { + job, err = db.FetchJob(ctx, jobUUID) + if err != nil { + return nil, jobError(err, "finding job %s of task %s", jobUUID, gormTask.UUID) + } + } + gormTask.Job = job + result[i] = gormTask } return result, nil diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index 38b02659..2bdb673e 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -59,9 +59,8 @@ func (sm *StateMachine) taskStatusChangeOnly( task *persistence.Task, newTaskStatus api.TaskStatus, ) error { - job := task.Job - if job == nil { - log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this") + if task.JobUUID == "" { + log.Panic().Str("task", task.UUID).Msg("task without job UUID, cannot handle this") return nil // Will not run because of the panic. } @@ -70,7 +69,7 @@ func (sm *StateMachine) taskStatusChangeOnly( logger := log.With(). Str("task", task.UUID). - Str("job", job.UUID). + Str("job", task.JobUUID). Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusNew", string(newTaskStatus)). Logger() @@ -83,7 +82,7 @@ func (sm *StateMachine) taskStatusChangeOnly( if oldTaskStatus != newTaskStatus { // logStorage already logs any error, and an error here shouldn't block the // rest of the function. - _ = sm.logStorage.WriteTimestamped(logger, job.UUID, task.UUID, + _ = sm.logStorage.WriteTimestamped(logger, task.JobUUID, task.UUID, fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus)) } @@ -101,9 +100,13 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( ctx context.Context, task *persistence.Task, oldTaskStatus api.TaskStatus, ) error { job := task.Job + if job == nil { + log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this") + return nil // Will not run because of the panic. + } logger := log.With(). - Str("job", job.UUID). + Str("job", task.JobUUID). Str("task", task.UUID). Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusNew", string(task.Status)). diff --git a/internal/manager/task_state_machine/task_state_machine_test.go b/internal/manager/task_state_machine/task_state_machine_test.go index d9df1b3a..e936bd23 100644 --- a/internal/manager/task_state_machine/task_state_machine_test.go +++ b/internal/manager/task_state_machine/task_state_machine_test.go @@ -473,8 +473,9 @@ func taskWithStatus(jobStatus api.JobStatus, taskStatus api.TaskStatus) *persist Model: persistence.Model{ID: 327}, UUID: "testtask-0001-4e28-aeea-8cbaf2fc96a5", - JobID: job.ID, - Job: &job, + JobUUID: job.UUID, + JobID: job.ID, + Job: &job, Status: taskStatus, } @@ -486,11 +487,12 @@ func taskWithStatus(jobStatus api.JobStatus, taskStatus api.TaskStatus) *persist func taskOfSameJob(task *persistence.Task, taskStatus api.TaskStatus) *persistence.Task { newTaskID := task.ID + 1 return &persistence.Task{ - Model: persistence.Model{ID: newTaskID}, - UUID: fmt.Sprintf("testtask-%04d-4e28-aeea-8cbaf2fc96a5", newTaskID), - JobID: task.JobID, - Job: task.Job, - Status: taskStatus, + Model: persistence.Model{ID: newTaskID}, + UUID: fmt.Sprintf("testtask-%04d-4e28-aeea-8cbaf2fc96a5", newTaskID), + JobUUID: task.JobUUID, + JobID: task.JobID, + Job: task.Job, + Status: taskStatus, } }