Manager: fetch jobs of tasks in FetchTasksOfWorkerInStatus()

The task state machine expects that `task.Job` is set correctly. Since
SQLC does not automatically fill this field (and rightfully so), I've added
a bit of Go code that fetches the job in a separate query.

A TODO is added as a reminder that it would be better for the task state
machine itself to fetch the job when needed.
This commit is contained in:
Sybren A. Stüvel 2024-05-28 16:07:17 +02:00
parent 1e327c510e
commit b66490831c
3 changed files with 37 additions and 14 deletions

View File

@ -650,14 +650,32 @@ func (db *DB) FetchTasksOfWorkerInStatus(ctx context.Context, worker *Worker, ta
return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus) return nil, taskError(err, "finding tasks of worker %s in status %q", worker.UUID, taskStatus)
} }
jobCache := make(map[uint]*Job)
result := make([]*Task, len(rows)) result := make([]*Task, len(rows))
for i := range rows { for i := range rows {
gormTask, err := convertSqlcTask(rows[i].Task, rows[i].JobUUID.String, worker.UUID) jobUUID := rows[i].JobUUID.String
gormTask, err := convertSqlcTask(rows[i].Task, jobUUID, worker.UUID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
gormTask.Worker = worker gormTask.Worker = worker
gormTask.WorkerID = &worker.ID gormTask.WorkerID = &worker.ID
// Fetch the job, either from the cache or from the database. This is done
// here because the task_state_machine functionality expects that task.Job
// is set.
// TODO: make that code fetch the job details it needs, rather than fetching
// the entire job here.
job := jobCache[gormTask.JobID]
if job == nil {
job, err = db.FetchJob(ctx, jobUUID)
if err != nil {
return nil, jobError(err, "finding job %s of task %s", jobUUID, gormTask.UUID)
}
}
gormTask.Job = job
result[i] = gormTask result[i] = gormTask
} }
return result, nil return result, nil

View File

@ -59,9 +59,8 @@ func (sm *StateMachine) taskStatusChangeOnly(
task *persistence.Task, task *persistence.Task,
newTaskStatus api.TaskStatus, newTaskStatus api.TaskStatus,
) error { ) error {
job := task.Job if task.JobUUID == "" {
if job == nil { log.Panic().Str("task", task.UUID).Msg("task without job UUID, cannot handle this")
log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this")
return nil // Will not run because of the panic. return nil // Will not run because of the panic.
} }
@ -70,7 +69,7 @@ func (sm *StateMachine) taskStatusChangeOnly(
logger := log.With(). logger := log.With().
Str("task", task.UUID). Str("task", task.UUID).
Str("job", job.UUID). Str("job", task.JobUUID).
Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(newTaskStatus)). Str("taskStatusNew", string(newTaskStatus)).
Logger() Logger()
@ -83,7 +82,7 @@ func (sm *StateMachine) taskStatusChangeOnly(
if oldTaskStatus != newTaskStatus { if oldTaskStatus != newTaskStatus {
// logStorage already logs any error, and an error here shouldn't block the // logStorage already logs any error, and an error here shouldn't block the
// rest of the function. // rest of the function.
_ = sm.logStorage.WriteTimestamped(logger, job.UUID, task.UUID, _ = sm.logStorage.WriteTimestamped(logger, task.JobUUID, task.UUID,
fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus)) fmt.Sprintf("task changed status %s -> %s", oldTaskStatus, newTaskStatus))
} }
@ -101,9 +100,13 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
ctx context.Context, task *persistence.Task, oldTaskStatus api.TaskStatus, ctx context.Context, task *persistence.Task, oldTaskStatus api.TaskStatus,
) error { ) error {
job := task.Job job := task.Job
if job == nil {
log.Panic().Str("task", task.UUID).Msg("task without job, cannot handle this")
return nil // Will not run because of the panic.
}
logger := log.With(). logger := log.With().
Str("job", job.UUID). Str("job", task.JobUUID).
Str("task", task.UUID). Str("task", task.UUID).
Str("taskStatusOld", string(oldTaskStatus)). Str("taskStatusOld", string(oldTaskStatus)).
Str("taskStatusNew", string(task.Status)). Str("taskStatusNew", string(task.Status)).

View File

@ -473,8 +473,9 @@ func taskWithStatus(jobStatus api.JobStatus, taskStatus api.TaskStatus) *persist
Model: persistence.Model{ID: 327}, Model: persistence.Model{ID: 327},
UUID: "testtask-0001-4e28-aeea-8cbaf2fc96a5", UUID: "testtask-0001-4e28-aeea-8cbaf2fc96a5",
JobID: job.ID, JobUUID: job.UUID,
Job: &job, JobID: job.ID,
Job: &job,
Status: taskStatus, Status: taskStatus,
} }
@ -486,11 +487,12 @@ func taskWithStatus(jobStatus api.JobStatus, taskStatus api.TaskStatus) *persist
func taskOfSameJob(task *persistence.Task, taskStatus api.TaskStatus) *persistence.Task { func taskOfSameJob(task *persistence.Task, taskStatus api.TaskStatus) *persistence.Task {
newTaskID := task.ID + 1 newTaskID := task.ID + 1
return &persistence.Task{ return &persistence.Task{
Model: persistence.Model{ID: newTaskID}, Model: persistence.Model{ID: newTaskID},
UUID: fmt.Sprintf("testtask-%04d-4e28-aeea-8cbaf2fc96a5", newTaskID), UUID: fmt.Sprintf("testtask-%04d-4e28-aeea-8cbaf2fc96a5", newTaskID),
JobID: task.JobID, JobUUID: task.JobUUID,
Job: task.Job, JobID: task.JobID,
Status: taskStatus, Job: task.Job,
Status: taskStatus,
} }
} }