From 7e5a631f33f2110b4e928e00f3ee1de15717525a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Mon, 28 Feb 2022 12:50:34 +0100 Subject: [PATCH] Cleanup: refactor `updateJobAfterTaskStatusChange()` Break up a complex function into smaller functions. --- .../task_state_machine/task_state_machine.go | 144 ++++++++++-------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/internal/manager/task_state_machine/task_state_machine.go b/internal/manager/task_state_machine/task_state_machine.go index d52df002..69c89e94 100644 --- a/internal/manager/task_state_machine/task_state_machine.go +++ b/internal/manager/task_state_machine/task_state_machine.go @@ -115,23 +115,11 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( Str("taskStatusNew", string(task.Status)). Logger() - // If the job has status 'ifStatus', move it to status 'thenStatus'. - jobStatusIfAThenB := func(ifStatus, thenStatus api.JobStatus) error { - if job.Status != ifStatus { - return nil - } - logger.Info(). - Str("jobStatusOld", string(ifStatus)). - Str("jobStatusNew", string(thenStatus)). - Msg("Job will change status because one of its task changed status") - return sm.JobStatusChange(ctx, job, thenStatus) - } - // Every 'case' in this switch MUST return. Just for sanity's sake. switch task.Status { case api.TaskStatusQueued: // Re-queueing a task on a completed job should re-queue the job too. - return jobStatusIfAThenB(api.JobStatusCompleted, api.JobStatusRequeued) + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued) case api.TaskStatusCancelRequested: // Requesting cancellation of a single task has no influence on the job itself. @@ -142,44 +130,10 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( return nil case api.TaskStatusCanceled: - // Only trigger cancellation/failure of the job if that was actually requested. - // A user can also cancel a single task from the web UI or API, in which - // case the job should just keep running. - if job.Status != api.JobStatusCancelRequested { - return nil - } - // This could be the last 'cancel-requested' task to go to 'canceled'. - hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested) - if err != nil { - return err - } - if !hasCancelReq { - logger.Info().Msg("last task of job went from cancel-requested to canceled") - return sm.JobStatusChange(ctx, job, api.JobStatusCanceled) - } - return nil + return sm.onTaskStatusCanceled(ctx, logger, job) case api.TaskStatusFailed: - // Count the number of failed tasks. If it is over the threshold, fail the job. - numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed) - if err != nil { - return err - } - failedPercentage := int(float64(numFailed) / float64(numTotal) * 100) - failLogger := logger.With(). - Int("taskNumTotal", numTotal). - Int("taskNumFailed", numFailed). - Int("failedPercentage", failedPercentage). - Int("threshold", taskFailJobPercentage). - Logger() - - if failedPercentage >= taskFailJobPercentage { - failLogger.Info().Msg("failing job because too many of its tasks failed") - return sm.JobStatusChange(ctx, job, api.JobStatusFailed) - } - // If the job didn't fail, this failure indicates that at least the job is active. - failLogger.Info().Msg("task failed, but not enough to fail the job") - return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive) + return sm.onTaskStatusFailed(ctx, logger, job) case api.TaskStatusActive, api.TaskStatusSoftFailed: switch job.Status { @@ -192,19 +146,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( } case api.TaskStatusCompleted: - numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted) - if err != nil { - return err - } - if numComplete == numTotal { - logger.Info().Msg("all tasks of job are completed, job is completed") - return sm.JobStatusChange(ctx, job, api.JobStatusCompleted) - } - logger.Info(). - Int("taskNumTotal", numTotal). - Int("taskNumComplete", numComplete). - Msg("task completed; there are more tasks to do") - return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive) + return sm.onTaskStatusCompleted(ctx, logger, job) default: logger.Warn().Msg("task obtained status that Flamenco did not expect") @@ -212,6 +154,84 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange( } } +// If the job has status 'ifStatus', move it to status 'thenStatus'. +func (sm *StateMachine) jobStatusIfAThenB( + ctx context.Context, + logger zerolog.Logger, + job *persistence.Job, + ifStatus, thenStatus api.JobStatus, +) error { + if job.Status != ifStatus { + return nil + } + logger.Info(). + Str("jobStatusOld", string(ifStatus)). + Str("jobStatusNew", string(thenStatus)). + Msg("Job will change status because one of its task changed status") + return sm.JobStatusChange(ctx, job, thenStatus) +} + +// onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job. +func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { + // Only trigger cancellation/failure of the job if that was actually requested. + // A user can also cancel a single task from the web UI or API, in which + // case the job should just keep running. + if job.Status != api.JobStatusCancelRequested { + return nil + } + // This could be the last 'cancel-requested' task to go to 'canceled'. + hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested) + if err != nil { + return err + } + if !hasCancelReq { + logger.Info().Msg("last task of job went from cancel-requested to canceled") + return sm.JobStatusChange(ctx, job, api.JobStatusCanceled) + } + return nil +} + +// onTaskStatusFailed conditionally escalates the failure of a task to fail the entire job. +func (sm *StateMachine) onTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { + // Count the number of failed tasks. If it is over the threshold, fail the job. + numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed) + if err != nil { + return err + } + failedPercentage := int(float64(numFailed) / float64(numTotal) * 100) + failLogger := logger.With(). + Int("taskNumTotal", numTotal). + Int("taskNumFailed", numFailed). + Int("failedPercentage", failedPercentage). + Int("threshold", taskFailJobPercentage). + Logger() + + if failedPercentage >= taskFailJobPercentage { + failLogger.Info().Msg("failing job because too many of its tasks failed") + return sm.JobStatusChange(ctx, job, api.JobStatusFailed) + } + // If the job didn't fail, this failure indicates that at least the job is active. + failLogger.Info().Msg("task failed, but not enough to fail the job") + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) +} + +// onTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job. +func (sm *StateMachine) onTaskStatusCompleted(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error { + numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted) + if err != nil { + return err + } + if numComplete == numTotal { + logger.Info().Msg("all tasks of job are completed, job is completed") + return sm.JobStatusChange(ctx, job, api.JobStatusCompleted) + } + logger.Info(). + Int("taskNumTotal", numTotal). + Int("taskNumComplete", numComplete). + Msg("task completed; there are more tasks to do") + return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive) +} + // JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks. func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error { // Job status changes can trigger task status changes, which can trigger the