From fe1627dd855c5474c48b52a33a1092dd3a5a4243 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 10 Jun 2022 14:58:44 +0200 Subject: [PATCH] Cleanup: timeout checker, move task-specific code to `tasks.go` Just a cleanup to prepare for the addition of worker timeouts. --- internal/manager/timeout_checker/tasks.go | 86 +++++++++++ .../manager/timeout_checker/tasks_test.go | 141 ++++++++++++++++++ .../timeout_checker/timeout_checker.go | 76 ---------- .../timeout_checker/timeout_checker_test.go | 130 ---------------- 4 files changed, 227 insertions(+), 206 deletions(-) create mode 100644 internal/manager/timeout_checker/tasks.go create mode 100644 internal/manager/timeout_checker/tasks_test.go diff --git a/internal/manager/timeout_checker/tasks.go b/internal/manager/timeout_checker/tasks.go new file mode 100644 index 00000000..e887dece --- /dev/null +++ b/internal/manager/timeout_checker/tasks.go @@ -0,0 +1,86 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +func (ttc *TimeoutChecker) checkTasks(ctx context.Context) { + timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout) + logger := log.With(). + Time("threshold", timeoutThreshold.Local()). + Logger() + logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold") + + tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold) + if err != nil { + log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database") + return + } + + if len(tasks) == 0 { + logger.Trace().Msg("TimeoutChecker: no timed-out tasks") + return + } + logger.Debug(). + Int("numTasks", len(tasks)). + Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold") + + for _, task := range tasks { + ttc.timeoutTask(ctx, task) + } +} + +// timeoutTask marks a task as 'failed' due to a timeout. +func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) { + workerIdent, logger := ttc.assignedWorker(task) + + task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent) + err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed) + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database") + } + + err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, + fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s", + workerIdent, task.LastTouchedAt.Format(time.RFC3339))) + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log") + } +} + +// assignedWorker returns a description of the worker assigned to this task, +// and a logger configured for it. +func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) { + logCtx := log.With().Str("task", task.UUID) + + if task.WorkerID == nil { + logger := logCtx.Logger() + logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker") + return "-unassigned-", logger + } + + if task.Worker == nil { + logger := logCtx.Logger() + logger.Warn().Uint("workerDBID", *task.WorkerID). + Msg("TimeoutChecker: task is assigned to worker that no longer exists") + return "-unknown-", logger + } + + logCtx = logCtx. + Str("worker", task.Worker.UUID). + Str("workerName", task.Worker.Name) + logger := logCtx.Logger() + logger.Warn().Msg("TimeoutChecker: task timed out") + + return task.Worker.Identifier(), logger +} diff --git a/internal/manager/timeout_checker/tasks_test.go b/internal/manager/timeout_checker/tasks_test.go new file mode 100644 index 00000000..af754bcf --- /dev/null +++ b/internal/manager/timeout_checker/tasks_test.go @@ -0,0 +1,141 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "gorm.io/gorm" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +const taskTimeout = 20 * time.Minute + +func TestTimeoutCheckerTiming(t *testing.T) { + ttc, finish, mocks := timeoutCheckerTestFixtures(t) + defer finish() + + mocks.run(ttc) + + // Wait for the timeout checker to actually be sleeping, otherwise it could + // have a different sleep-start time than we expect. + time.Sleep(1 * time.Millisecond) + + // Determine the deadlines relative to the initial clock value. + initialTime := mocks.clock.Now().UTC() + deadlines := []time.Time{ + initialTime.Add(timeoutInitialSleep - taskTimeout), + initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval), + initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval), + } + + // Expect three fetches, one after the initial sleep time, and two a regular interval later. + fetchTimes := make([]time.Time, len(deadlines)) + firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[0] = mocks.clock.Now().UTC() + return []*persistence.Task{}, nil + }) + + secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[1] = mocks.clock.Now().UTC() + // Return a database error. This shouldn't break the check loop. + return []*persistence.Task{}, errors.New("testing what errors do") + }). + After(firstCall) + + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[2] = mocks.clock.Now().UTC() + return []*persistence.Task{}, nil + }). + After(secondCall) + + mocks.clock.Add(2 * time.Minute) // Should still be sleeping. + mocks.clock.Add(2 * time.Minute) // Should still be sleeping. + mocks.clock.Add(time.Minute) // Should trigger the first fetch. + mocks.clock.Add(time.Minute) // Should trigger the second fetch. + mocks.clock.Add(time.Minute) // Should trigger the third fetch. + + // Wait for the timeout checker to actually run & hit the expected calls. + time.Sleep(1 * time.Millisecond) + + for idx, fetchTime := range fetchTimes { + // Check for zero values first, because they can be a bit confusing in the assert.Equal() logs. + if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) { + continue + } + expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval) + assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx) + } +} + +func TestTaskTimeout(t *testing.T) { + // Canary test: if these constants do not have the expected value, the test + // will fail rather cryptically. + if !assert.Equal(t, 5*time.Minute, timeoutInitialSleep, "timeoutInitialSleep does not have the expected value") || + !assert.Equal(t, 1*time.Minute, timeoutCheckInterval, "timeoutCheckInterval does not have the expected value") { + t.FailNow() + } + + ttc, finish, mocks := timeoutCheckerTestFixtures(t) + defer finish() + + mocks.run(ttc) + + // Wait for the timeout checker to actually be sleeping, otherwise it could + // have a different sleep-start time than we expect. + time.Sleep(1 * time.Millisecond) + + lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour) + + job := persistence.Job{UUID: "JOB-UUID"} + worker := persistence.Worker{ + UUID: "WORKER-UUID", + Name: "Tester", + Model: gorm.Model{ID: 47}, + } + taskUnassigned := persistence.Task{ + UUID: "TASK-UUID-UNASSIGNED", + Job: &job, + LastTouchedAt: lastTime, + } + taskUnknownWorker := persistence.Task{ + UUID: "TASK-UUID-UNKNOWN", + Job: &job, + LastTouchedAt: lastTime, + WorkerID: &worker.ID, + } + taskAssigned := persistence.Task{ + UUID: "TASK-UUID-ASSIGNED", + Job: &job, + LastTouchedAt: lastTime, + WorkerID: &worker.ID, + Worker: &worker, + } + + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()). + Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil) + + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed) + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed) + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed) + + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID, + "Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z") + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID, + "Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z") + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID, + "Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z") + + // All the timeouts should be handled after the initial sleep. + mocks.clock.Add(timeoutInitialSleep) +} diff --git a/internal/manager/timeout_checker/timeout_checker.go b/internal/manager/timeout_checker/timeout_checker.go index e284fb7e..2f8d9dc6 100644 --- a/internal/manager/timeout_checker/timeout_checker.go +++ b/internal/manager/timeout_checker/timeout_checker.go @@ -4,15 +4,10 @@ package timeout_checker import ( "context" - "fmt" "time" "github.com/benbjohnson/clock" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" - - "git.blender.org/flamenco/internal/manager/persistence" - "git.blender.org/flamenco/pkg/api" ) // Interval for checking all active tasks for timeouts. @@ -81,77 +76,6 @@ func (ttc *TimeoutChecker) Run(ctx context.Context) { } } -func (ttc *TimeoutChecker) checkTasks(ctx context.Context) { - timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout) - logger := log.With(). - Time("threshold", timeoutThreshold.Local()). - Logger() - logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold") - - tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold) - if err != nil { - log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database") - return - } - - if len(tasks) == 0 { - logger.Trace().Msg("TimeoutChecker: no timed-out tasks") - return - } - logger.Debug(). - Int("numTasks", len(tasks)). - Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold") - - for _, task := range tasks { - ttc.timeoutTask(ctx, task) - } -} - -// timeoutTask marks a task as 'failed' due to a timeout. -func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) { - workerIdent, logger := ttc.assignedWorker(task) - - task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent) - err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed) - if err != nil { - logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database") - } - - err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, - fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s", - workerIdent, task.LastTouchedAt.Format(time.RFC3339))) - if err != nil { - logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log") - } -} - -// assignedWorker returns a description of the worker assigned to this task, -// and a logger configured for it. -func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) { - logCtx := log.With().Str("task", task.UUID) - - if task.WorkerID == nil { - logger := logCtx.Logger() - logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker") - return "-unassigned-", logger - } - - if task.Worker == nil { - logger := logCtx.Logger() - logger.Warn().Uint("workerDBID", *task.WorkerID). - Msg("TimeoutChecker: task is assigned to worker that no longer exists") - return "-unknown-", logger - } - - logCtx = logCtx. - Str("worker", task.Worker.UUID). - Str("workerName", task.Worker.Name) - logger := logCtx.Logger() - logger.Warn().Msg("TimeoutChecker: task timed out") - - return task.Worker.Identifier(), logger -} - // func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) { // timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval) // log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold) diff --git a/internal/manager/timeout_checker/timeout_checker_test.go b/internal/manager/timeout_checker/timeout_checker_test.go index 94d68ab6..b949d499 100644 --- a/internal/manager/timeout_checker/timeout_checker_test.go +++ b/internal/manager/timeout_checker/timeout_checker_test.go @@ -4,145 +4,15 @@ package timeout_checker import ( "context" - "errors" "sync" "testing" - "time" "github.com/benbjohnson/clock" "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "gorm.io/gorm" - "git.blender.org/flamenco/internal/manager/persistence" "git.blender.org/flamenco/internal/manager/timeout_checker/mocks" - "git.blender.org/flamenco/pkg/api" ) -const taskTimeout = 20 * time.Minute - -func TestTimeoutCheckerTiming(t *testing.T) { - ttc, finish, mocks := timeoutCheckerTestFixtures(t) - defer finish() - - mocks.run(ttc) - - // Wait for the timeout checker to actually be sleeping, otherwise it could - // have a different sleep-start time than we expect. - time.Sleep(1 * time.Millisecond) - - // Determine the deadlines relative to the initial clock value. - initialTime := mocks.clock.Now().UTC() - deadlines := []time.Time{ - initialTime.Add(timeoutInitialSleep - taskTimeout), - initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval), - initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval), - } - - // Expect three fetches, one after the initial sleep time, and two a regular interval later. - fetchTimes := make([]time.Time, len(deadlines)) - firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]). - DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { - fetchTimes[0] = mocks.clock.Now().UTC() - return []*persistence.Task{}, nil - }) - - secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]). - DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { - fetchTimes[1] = mocks.clock.Now().UTC() - // Return a database error. This shouldn't break the check loop. - return []*persistence.Task{}, errors.New("testing what errors do") - }). - After(firstCall) - - mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]). - DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { - fetchTimes[2] = mocks.clock.Now().UTC() - return []*persistence.Task{}, nil - }). - After(secondCall) - - mocks.clock.Add(2 * time.Minute) // Should still be sleeping. - mocks.clock.Add(2 * time.Minute) // Should still be sleeping. - mocks.clock.Add(time.Minute) // Should trigger the first fetch. - mocks.clock.Add(time.Minute) // Should trigger the second fetch. - mocks.clock.Add(time.Minute) // Should trigger the third fetch. - - // Wait for the timeout checker to actually run & hit the expected calls. - time.Sleep(1 * time.Millisecond) - - for idx, fetchTime := range fetchTimes { - // Check for zero values first, because they can be a bit confusing in the assert.Equal() logs. - if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) { - continue - } - expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval) - assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx) - } -} - -func TestTaskTimeout(t *testing.T) { - // Canary test: if these constants do not have the expected value, the test - // will fail rather cryptically. - if !assert.Equal(t, 5*time.Minute, timeoutInitialSleep, "timeoutInitialSleep does not have the expected value") || - !assert.Equal(t, 1*time.Minute, timeoutCheckInterval, "timeoutCheckInterval does not have the expected value") { - t.FailNow() - } - - ttc, finish, mocks := timeoutCheckerTestFixtures(t) - defer finish() - - mocks.run(ttc) - - // Wait for the timeout checker to actually be sleeping, otherwise it could - // have a different sleep-start time than we expect. - time.Sleep(1 * time.Millisecond) - - lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour) - - job := persistence.Job{UUID: "JOB-UUID"} - worker := persistence.Worker{ - UUID: "WORKER-UUID", - Name: "Tester", - Model: gorm.Model{ID: 47}, - } - taskUnassigned := persistence.Task{ - UUID: "TASK-UUID-UNASSIGNED", - Job: &job, - LastTouchedAt: lastTime, - } - taskUnknownWorker := persistence.Task{ - UUID: "TASK-UUID-UNKNOWN", - Job: &job, - LastTouchedAt: lastTime, - WorkerID: &worker.ID, - } - taskAssigned := persistence.Task{ - UUID: "TASK-UUID-ASSIGNED", - Job: &job, - LastTouchedAt: lastTime, - WorkerID: &worker.ID, - Worker: &worker, - } - - mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()). - Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil) - - mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed) - mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed) - mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed) - - mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID, - "Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z") - mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID, - "Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z") - mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID, - "Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z") - - // All the timeouts should be handled after the initial sleep. - mocks.clock.Add(timeoutInitialSleep) -} - type TimeoutCheckerMocks struct { clock *clock.Mock persist *mocks.MockPersistenceService