Cleanup: refactor updateJobAfterTaskStatusChange()

Break up a complex function into smaller functions.
This commit is contained in:
Sybren A. Stüvel 2022-02-28 12:50:34 +01:00
parent 41168ff68b
commit 7e5a631f33

View File

@ -115,23 +115,11 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
Str("taskStatusNew", string(task.Status)).
Logger()
// If the job has status 'ifStatus', move it to status 'thenStatus'.
jobStatusIfAThenB := func(ifStatus, thenStatus api.JobStatus) error {
if job.Status != ifStatus {
return nil
}
logger.Info().
Str("jobStatusOld", string(ifStatus)).
Str("jobStatusNew", string(thenStatus)).
Msg("Job will change status because one of its task changed status")
return sm.JobStatusChange(ctx, job, thenStatus)
}
// Every 'case' in this switch MUST return. Just for sanity's sake.
switch task.Status {
case api.TaskStatusQueued:
// Re-queueing a task on a completed job should re-queue the job too.
return jobStatusIfAThenB(api.JobStatusCompleted, api.JobStatusRequeued)
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusCompleted, api.JobStatusRequeued)
case api.TaskStatusCancelRequested:
// Requesting cancellation of a single task has no influence on the job itself.
@ -142,44 +130,10 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
return nil
case api.TaskStatusCanceled:
// Only trigger cancellation/failure of the job if that was actually requested.
// A user can also cancel a single task from the web UI or API, in which
// case the job should just keep running.
if job.Status != api.JobStatusCancelRequested {
return nil
}
// This could be the last 'cancel-requested' task to go to 'canceled'.
hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested)
if err != nil {
return err
}
if !hasCancelReq {
logger.Info().Msg("last task of job went from cancel-requested to canceled")
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled)
}
return nil
return sm.onTaskStatusCanceled(ctx, logger, job)
case api.TaskStatusFailed:
// Count the number of failed tasks. If it is over the threshold, fail the job.
numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed)
if err != nil {
return err
}
failedPercentage := int(float64(numFailed) / float64(numTotal) * 100)
failLogger := logger.With().
Int("taskNumTotal", numTotal).
Int("taskNumFailed", numFailed).
Int("failedPercentage", failedPercentage).
Int("threshold", taskFailJobPercentage).
Logger()
if failedPercentage >= taskFailJobPercentage {
failLogger.Info().Msg("failing job because too many of its tasks failed")
return sm.JobStatusChange(ctx, job, api.JobStatusFailed)
}
// If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job")
return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive)
return sm.onTaskStatusFailed(ctx, logger, job)
case api.TaskStatusActive, api.TaskStatusSoftFailed:
switch job.Status {
@ -192,19 +146,7 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
}
case api.TaskStatusCompleted:
numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return err
}
if numComplete == numTotal {
logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted)
}
logger.Info().
Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete).
Msg("task completed; there are more tasks to do")
return jobStatusIfAThenB(api.JobStatusQueued, api.JobStatusActive)
return sm.onTaskStatusCompleted(ctx, logger, job)
default:
logger.Warn().Msg("task obtained status that Flamenco did not expect")
@ -212,6 +154,84 @@ func (sm *StateMachine) updateJobAfterTaskStatusChange(
}
}
// If the job has status 'ifStatus', move it to status 'thenStatus'.
func (sm *StateMachine) jobStatusIfAThenB(
ctx context.Context,
logger zerolog.Logger,
job *persistence.Job,
ifStatus, thenStatus api.JobStatus,
) error {
if job.Status != ifStatus {
return nil
}
logger.Info().
Str("jobStatusOld", string(ifStatus)).
Str("jobStatusNew", string(thenStatus)).
Msg("Job will change status because one of its task changed status")
return sm.JobStatusChange(ctx, job, thenStatus)
}
// onTaskStatusCanceled conditionally escalates the cancellation of a task to cancel the job.
func (sm *StateMachine) onTaskStatusCanceled(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// Only trigger cancellation/failure of the job if that was actually requested.
// A user can also cancel a single task from the web UI or API, in which
// case the job should just keep running.
if job.Status != api.JobStatusCancelRequested {
return nil
}
// This could be the last 'cancel-requested' task to go to 'canceled'.
hasCancelReq, err := sm.persist.JobHasTasksInStatus(ctx, job, api.TaskStatusCancelRequested)
if err != nil {
return err
}
if !hasCancelReq {
logger.Info().Msg("last task of job went from cancel-requested to canceled")
return sm.JobStatusChange(ctx, job, api.JobStatusCanceled)
}
return nil
}
// onTaskStatusFailed conditionally escalates the failure of a task to fail the entire job.
func (sm *StateMachine) onTaskStatusFailed(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
// Count the number of failed tasks. If it is over the threshold, fail the job.
numFailed, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusFailed)
if err != nil {
return err
}
failedPercentage := int(float64(numFailed) / float64(numTotal) * 100)
failLogger := logger.With().
Int("taskNumTotal", numTotal).
Int("taskNumFailed", numFailed).
Int("failedPercentage", failedPercentage).
Int("threshold", taskFailJobPercentage).
Logger()
if failedPercentage >= taskFailJobPercentage {
failLogger.Info().Msg("failing job because too many of its tasks failed")
return sm.JobStatusChange(ctx, job, api.JobStatusFailed)
}
// If the job didn't fail, this failure indicates that at least the job is active.
failLogger.Info().Msg("task failed, but not enough to fail the job")
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive)
}
// onTaskStatusCompleted conditionally escalates the completion of a task to complete the entire job.
func (sm *StateMachine) onTaskStatusCompleted(ctx context.Context, logger zerolog.Logger, job *persistence.Job) error {
numComplete, numTotal, err := sm.persist.CountTasksOfJobInStatus(ctx, job, api.TaskStatusCompleted)
if err != nil {
return err
}
if numComplete == numTotal {
logger.Info().Msg("all tasks of job are completed, job is completed")
return sm.JobStatusChange(ctx, job, api.JobStatusCompleted)
}
logger.Info().
Int("taskNumTotal", numTotal).
Int("taskNumComplete", numComplete).
Msg("task completed; there are more tasks to do")
return sm.jobStatusIfAThenB(ctx, logger, job, api.JobStatusQueued, api.JobStatusActive)
}
// JobStatusChange gives a Job a new status, and handles the resulting status changes on its tasks.
func (sm *StateMachine) JobStatusChange(ctx context.Context, job *persistence.Job, newJobStatus api.JobStatus) error {
// Job status changes can trigger task status changes, which can trigger the