diff --git a/cmd/flamenco-manager/main.go b/cmd/flamenco-manager/main.go index 231229f1..1bda962b 100644 --- a/cmd/flamenco-manager/main.go +++ b/cmd/flamenco-manager/main.go @@ -35,6 +35,7 @@ import ( "git.blender.org/flamenco/internal/manager/swagger_ui" "git.blender.org/flamenco/internal/manager/task_logs" "git.blender.org/flamenco/internal/manager/task_state_machine" + "git.blender.org/flamenco/internal/manager/timeout_checker" "git.blender.org/flamenco/internal/manager/webupdates" "git.blender.org/flamenco/internal/own_url" "git.blender.org/flamenco/internal/upnp_ssdp" @@ -106,11 +107,16 @@ func main() { // // go persist.PeriodicMaintenanceLoop(mainCtx) + timeService := clock.New() webUpdater := webupdates.New() taskStateMachine := task_state_machine.NewStateMachine(persist, webUpdater) - flamenco := buildFlamencoAPI(configService, persist, taskStateMachine, webUpdater) + logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) + flamenco := buildFlamencoAPI(timeService, configService, persist, taskStateMachine, logStorage, webUpdater) e := buildWebService(flamenco, persist, ssdp, webUpdater, urls) + timeoutChecker := timeout_checker.New( + configService.Get().TaskTimeout, timeService, persist, taskStateMachine, logStorage) + installSignalHandler(mainCtxCancel) // Before doing anything new, clean up in case we made a mess in an earlier run. @@ -144,22 +150,29 @@ func main() { }() } + // Start the timeout checker. + wg.Add(1) + go func() { + defer wg.Done() + timeoutChecker.Run(mainCtx) + }() + wg.Wait() log.Info().Msg("shutdown complete") } func buildFlamencoAPI( + timeService clock.Clock, configService *config.Service, persist *persistence.DB, taskStateMachine *task_state_machine.StateMachine, + logStorage *task_logs.Storage, webUpdater *webupdates.BiDirComms, ) api.ServerInterface { - timeService := clock.New() compiler, err := job_compilers.Load(timeService) if err != nil { log.Fatal().Err(err).Msg("error loading job compilers") } - logStorage := task_logs.NewStorage(configService.Get().TaskLogsPath, timeService, webUpdater) shamanServer := shaman.NewServer(configService.Get().Shaman, nil) flamenco := api_impl.NewFlamenco( compiler, persist, webUpdater, logStorage, configService, diff --git a/internal/manager/config/config.go b/internal/manager/config/config.go index a6c1a605..cecdb2c0 100644 --- a/internal/manager/config/config.go +++ b/internal/manager/config/config.go @@ -84,7 +84,7 @@ type Base struct { // TLSCert string `yaml:"tlscert"` // ACMEDomainName string `yaml:"acme_domain_name"` // for the ACME Let's Encrypt client - // ActiveTaskTimeoutInterval time.Duration `yaml:"active_task_timeout_interval"` + TaskTimeout time.Duration `yaml:"task_timeout"` // ActiveWorkerTimeoutInterval time.Duration `yaml:"active_worker_timeout_interval"` // WorkerCleanupMaxAge time.Duration `yaml:"worker_cleanup_max_age"` diff --git a/internal/manager/config/defaults.go b/internal/manager/config/defaults.go index 344dcbc7..a32075f5 100644 --- a/internal/manager/config/defaults.go +++ b/internal/manager/config/defaults.go @@ -32,7 +32,7 @@ var defaultConfig = Conf{ }, }, - // ActiveTaskTimeoutInterval: 10 * time.Minute, + TaskTimeout: 10 * time.Minute, // ActiveWorkerTimeoutInterval: 1 * time.Minute, // // Days are assumed to be 24 hours long. This is not exactly accurate, but should diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 8b54601f..80b7d240 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -45,8 +45,8 @@ type Task struct { // Which worker is/was working on this. WorkerID *uint - Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` - LastTouchedAt time.Time + Worker *Worker `gorm:"foreignkey:WorkerID;references:ID;constraint:OnDelete:CASCADE"` + LastTouchedAt time.Time // Should contain UTC timestamps. // Dependencies are tasks that need to be completed before this one can run. Dependencies []*Task `gorm:"many2many:task_dependencies;constraint:OnDelete:CASCADE"` diff --git a/internal/manager/persistence/timeout.go b/internal/manager/persistence/timeout.go new file mode 100644 index 00000000..fbf1d075 --- /dev/null +++ b/internal/manager/persistence/timeout.go @@ -0,0 +1,26 @@ +package persistence + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "time" + + "git.blender.org/flamenco/pkg/api" +) + +// This file contains functions for dealing with task/worker timeouts. Not database timeouts. + +func (db *DB) FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*Task, error) { + result := []*Task{} + tx := db.gormDB.WithContext(ctx). + Model(&Task{}). + Joins("Job"). + Where("tasks.status = ?", api.TaskStatusActive). + Where("tasks.last_touched_at <= ?", untouchedSince). + Scan(&result) + if tx.Error != nil { + return nil, taskError(tx.Error, "finding timed out tasks (untouched since %s)", untouchedSince.String()) + } + return result, nil +} diff --git a/internal/manager/persistence/timeout_test.go b/internal/manager/persistence/timeout_test.go new file mode 100644 index 00000000..b9dfa9de --- /dev/null +++ b/internal/manager/persistence/timeout_test.go @@ -0,0 +1,51 @@ +package persistence + +import ( + "testing" + "time" + + "git.blender.org/flamenco/pkg/api" + "github.com/stretchr/testify/assert" +) + +// SPDX-License-Identifier: GPL-3.0-or-later + +func TestFetchTimedOutTasks(t *testing.T) { + ctx, close, db, job, _ := jobTasksTestFixtures(t) + defer close() + + tasks, err := db.FetchTasksOfJob(ctx, job) + if !assert.NoError(t, err) { + t.FailNow() + } + + now := db.gormDB.NowFunc() + deadline := now.Add(-5 * time.Minute) + + // Mark the task as last touched before the deadline, i.e. old enough for a timeout. + task := tasks[0] + task.LastTouchedAt = deadline.Add(-1 * time.Minute) + assert.NoError(t, db.SaveTask(ctx, task)) + + w := createWorker(ctx, t, db) + assert.NoError(t, db.TaskAssignToWorker(ctx, task, w)) + + // The task should still not be returned, as it's not in 'active' state. + timedout, err := db.FetchTimedOutTasks(ctx, deadline) + assert.NoError(t, err) + assert.Empty(t, timedout) + + // Mark as Active: + task.Status = api.TaskStatusActive + assert.NoError(t, db.SaveTask(ctx, task)) + + // Now it should time out: + timedout, err = db.FetchTimedOutTasks(ctx, deadline) + assert.NoError(t, err) + if assert.Len(t, timedout, 1) { + // Other fields will be different, like the 'UpdatedAt' field -- this just + // tests that the expected task is returned. + assert.Equal(t, task.UUID, timedout[0].UUID) + assert.Equal(t, job, task.Job, "the job should be included in the result as well") + } +} diff --git a/internal/manager/timeout_checker/interfaces.go b/internal/manager/timeout_checker/interfaces.go new file mode 100644 index 00000000..5c63e84a --- /dev/null +++ b/internal/manager/timeout_checker/interfaces.go @@ -0,0 +1,34 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "time" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/task_state_machine" + "git.blender.org/flamenco/pkg/api" + "github.com/rs/zerolog" +) + +// Generate mock implementations of these interfaces. +//go:generate go run github.com/golang/mock/mockgen -destination mocks/interfaces_mock.gen.go -package mocks git.blender.org/flamenco/internal/manager/timeout_checker PersistenceService,TaskStateMachine,LogStorage + +type PersistenceService interface { + FetchTimedOutTasks(ctx context.Context, untouchedSince time.Time) ([]*persistence.Task, error) +} + +var _ PersistenceService = (*persistence.DB)(nil) + +type TaskStateMachine interface { + // TaskStatusChange gives a Task a new status, and handles the resulting status changes on the job. + TaskStatusChange(ctx context.Context, task *persistence.Task, newStatus api.TaskStatus) error +} + +var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) + +// LogStorage is used to append timeout messages to task logs. +type LogStorage interface { + WriteTimestamped(logger zerolog.Logger, jobID, taskID string, logText string) error +} diff --git a/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go new file mode 100644 index 00000000..ee98f27b --- /dev/null +++ b/internal/manager/timeout_checker/mocks/interfaces_mock.gen.go @@ -0,0 +1,128 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: git.blender.org/flamenco/internal/manager/timeout_checker (interfaces: PersistenceService,TaskStateMachine,LogStorage) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + persistence "git.blender.org/flamenco/internal/manager/persistence" + api "git.blender.org/flamenco/pkg/api" + gomock "github.com/golang/mock/gomock" + zerolog "github.com/rs/zerolog" +) + +// MockPersistenceService is a mock of PersistenceService interface. +type MockPersistenceService struct { + ctrl *gomock.Controller + recorder *MockPersistenceServiceMockRecorder +} + +// MockPersistenceServiceMockRecorder is the mock recorder for MockPersistenceService. +type MockPersistenceServiceMockRecorder struct { + mock *MockPersistenceService +} + +// NewMockPersistenceService creates a new mock instance. +func NewMockPersistenceService(ctrl *gomock.Controller) *MockPersistenceService { + mock := &MockPersistenceService{ctrl: ctrl} + mock.recorder = &MockPersistenceServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPersistenceService) EXPECT() *MockPersistenceServiceMockRecorder { + return m.recorder +} + +// FetchTimedOutTasks mocks base method. +func (m *MockPersistenceService) FetchTimedOutTasks(arg0 context.Context, arg1 time.Time) ([]*persistence.Task, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchTimedOutTasks", arg0, arg1) + ret0, _ := ret[0].([]*persistence.Task) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchTimedOutTasks indicates an expected call of FetchTimedOutTasks. +func (mr *MockPersistenceServiceMockRecorder) FetchTimedOutTasks(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchTimedOutTasks", reflect.TypeOf((*MockPersistenceService)(nil).FetchTimedOutTasks), arg0, arg1) +} + +// MockTaskStateMachine is a mock of TaskStateMachine interface. +type MockTaskStateMachine struct { + ctrl *gomock.Controller + recorder *MockTaskStateMachineMockRecorder +} + +// MockTaskStateMachineMockRecorder is the mock recorder for MockTaskStateMachine. +type MockTaskStateMachineMockRecorder struct { + mock *MockTaskStateMachine +} + +// NewMockTaskStateMachine creates a new mock instance. +func NewMockTaskStateMachine(ctrl *gomock.Controller) *MockTaskStateMachine { + mock := &MockTaskStateMachine{ctrl: ctrl} + mock.recorder = &MockTaskStateMachineMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskStateMachine) EXPECT() *MockTaskStateMachineMockRecorder { + return m.recorder +} + +// TaskStatusChange mocks base method. +func (m *MockTaskStateMachine) TaskStatusChange(arg0 context.Context, arg1 *persistence.Task, arg2 api.TaskStatus) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TaskStatusChange", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// TaskStatusChange indicates an expected call of TaskStatusChange. +func (mr *MockTaskStateMachineMockRecorder) TaskStatusChange(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskStatusChange", reflect.TypeOf((*MockTaskStateMachine)(nil).TaskStatusChange), arg0, arg1, arg2) +} + +// MockLogStorage is a mock of LogStorage interface. +type MockLogStorage struct { + ctrl *gomock.Controller + recorder *MockLogStorageMockRecorder +} + +// MockLogStorageMockRecorder is the mock recorder for MockLogStorage. +type MockLogStorageMockRecorder struct { + mock *MockLogStorage +} + +// NewMockLogStorage creates a new mock instance. +func NewMockLogStorage(ctrl *gomock.Controller) *MockLogStorage { + mock := &MockLogStorage{ctrl: ctrl} + mock.recorder = &MockLogStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogStorage) EXPECT() *MockLogStorageMockRecorder { + return m.recorder +} + +// WriteTimestamped mocks base method. +func (m *MockLogStorage) WriteTimestamped(arg0 zerolog.Logger, arg1, arg2, arg3 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteTimestamped", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteTimestamped indicates an expected call of WriteTimestamped. +func (mr *MockLogStorageMockRecorder) WriteTimestamped(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTimestamped", reflect.TypeOf((*MockLogStorage)(nil).WriteTimestamped), arg0, arg1, arg2, arg3) +} diff --git a/internal/manager/timeout_checker/timeout_checker.go b/internal/manager/timeout_checker/timeout_checker.go new file mode 100644 index 00000000..e284fb7e --- /dev/null +++ b/internal/manager/timeout_checker/timeout_checker.go @@ -0,0 +1,181 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "fmt" + "time" + + "github.com/benbjohnson/clock" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/pkg/api" +) + +// Interval for checking all active tasks for timeouts. +const timeoutCheckInterval = 1 * time.Minute + +// Delay for the intial check. This gives workers a chance to reconnect to the Manager +// and send updates after the Manager has started. +const timeoutInitialSleep = 5 * time.Minute + +// TimeoutChecker periodically times out tasks and workers if the worker hasn't sent any update recently. +type TimeoutChecker struct { + taskTimeout time.Duration + + clock clock.Clock + persist PersistenceService + taskStateMachine TaskStateMachine + logStorage LogStorage +} + +// New creates a new TimeoutChecker. +func New( + taskTimeout time.Duration, + clock clock.Clock, + persist PersistenceService, + taskStateMachine TaskStateMachine, + logStorage LogStorage, +) *TimeoutChecker { + return &TimeoutChecker{ + taskTimeout: taskTimeout, + + clock: clock, + persist: persist, + taskStateMachine: taskStateMachine, + logStorage: logStorage, + } +} + +// Run runs the timeout checker until the context closes. +func (ttc *TimeoutChecker) Run(ctx context.Context) { + defer log.Info().Msg("TimeoutChecker: shutting down") + + if ttc.taskTimeout == 0 { + log.Warn().Msg("TimeoutChecker: no timeout duration configured, will not check for task timeouts") + return + } + + log.Info(). + Str("taskTimeout", ttc.taskTimeout.String()). + Str("initialSleep", timeoutInitialSleep.String()). + Str("checkInterval", timeoutCheckInterval.String()). + Msg("TimeoutChecker: starting up") + + // Start with a delay, so that workers get a chance to push their updates + // after the manager has started up. + waitDur := timeoutInitialSleep + + for { + select { + case <-ctx.Done(): + return + case <-ttc.clock.After(waitDur): + waitDur = timeoutCheckInterval + } + ttc.checkTasks(ctx) + // ttc.checkWorkers(ctx) + } +} + +func (ttc *TimeoutChecker) checkTasks(ctx context.Context) { + timeoutThreshold := ttc.clock.Now().UTC().Add(-ttc.taskTimeout) + logger := log.With(). + Time("threshold", timeoutThreshold.Local()). + Logger() + logger.Debug().Msg("TimeoutChecker: finding active tasks that have not been touched since threshold") + + tasks, err := ttc.persist.FetchTimedOutTasks(ctx, timeoutThreshold) + if err != nil { + log.Error().Err(err).Msg("TimeoutChecker: error fetching timed-out tasks from database") + return + } + + if len(tasks) == 0 { + logger.Trace().Msg("TimeoutChecker: no timed-out tasks") + return + } + logger.Debug(). + Int("numTasks", len(tasks)). + Msg("TimeoutChecker: failing all active tasks that have not been touched since threshold") + + for _, task := range tasks { + ttc.timeoutTask(ctx, task) + } +} + +// timeoutTask marks a task as 'failed' due to a timeout. +func (ttc *TimeoutChecker) timeoutTask(ctx context.Context, task *persistence.Task) { + workerIdent, logger := ttc.assignedWorker(task) + + task.Activity = fmt.Sprintf("Task timed out on worker %s", workerIdent) + err := ttc.taskStateMachine.TaskStatusChange(ctx, task, api.TaskStatusFailed) + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error saving timed-out task to database") + } + + err = ttc.logStorage.WriteTimestamped(logger, task.Job.UUID, task.UUID, + fmt.Sprintf("Task timed out. It was assigned to worker %s, but untouched since %s", + workerIdent, task.LastTouchedAt.Format(time.RFC3339))) + if err != nil { + logger.Error().Err(err).Msg("TimeoutChecker: error writing timeout info to the task log") + } +} + +// assignedWorker returns a description of the worker assigned to this task, +// and a logger configured for it. +func (ttc *TimeoutChecker) assignedWorker(task *persistence.Task) (string, zerolog.Logger) { + logCtx := log.With().Str("task", task.UUID) + + if task.WorkerID == nil { + logger := logCtx.Logger() + logger.Warn().Msg("TimeoutChecker: task timed out, but was not assigned to any worker") + return "-unassigned-", logger + } + + if task.Worker == nil { + logger := logCtx.Logger() + logger.Warn().Uint("workerDBID", *task.WorkerID). + Msg("TimeoutChecker: task is assigned to worker that no longer exists") + return "-unknown-", logger + } + + logCtx = logCtx. + Str("worker", task.Worker.UUID). + Str("workerName", task.Worker.Name) + logger := logCtx.Logger() + logger.Warn().Msg("TimeoutChecker: task timed out") + + return task.Worker.Identifier(), logger +} + +// func (ttc *TimeoutChecker) checkWorkers(db *mgo.Database) { +// timeoutThreshold := UtcNow().Add(-ttc.config.ActiveWorkerTimeoutInterval) +// log.Debugf("Failing all awake workers that have not been seen since %s", timeoutThreshold) + +// var timedoutWorkers []Worker +// // find all awake workers that either have never been seen, or were seen long ago. +// query := M{ +// "status": workerStatusAwake, +// "$or": []M{ +// M{"last_activity": M{"$lte": timeoutThreshold}}, +// M{"last_activity": M{"$exists": false}}, +// }, +// } +// projection := M{ +// "_id": 1, +// "nickname": 1, +// "address": 1, +// "status": 1, +// } +// if err := db.C("flamenco_workers").Find(query).Select(projection).All(&timedoutWorkers); err != nil { +// log.Warningf("Error finding timed-out workers: %s", err) +// } + +// for _, worker := range timedoutWorkers { +// worker.Timeout(db, ttc.scheduler) +// } +// } diff --git a/internal/manager/timeout_checker/timeout_checker_test.go b/internal/manager/timeout_checker/timeout_checker_test.go new file mode 100644 index 00000000..c3a5f2b3 --- /dev/null +++ b/internal/manager/timeout_checker/timeout_checker_test.go @@ -0,0 +1,196 @@ +package timeout_checker + +// SPDX-License-Identifier: GPL-3.0-or-later + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "gorm.io/gorm" + + "git.blender.org/flamenco/internal/manager/persistence" + "git.blender.org/flamenco/internal/manager/timeout_checker/mocks" + "git.blender.org/flamenco/pkg/api" +) + +const taskTimeout = 20 * time.Minute + +func TestTimeoutCheckerTiming(t *testing.T) { + ttc, finish, mocks := timeoutCheckerTestFixtures(t) + defer finish() + + mocks.run(ttc) + + // Wait for the timeout checker to actually be sleeping, otherwise it could + // have a different sleep-start time than we expect. + time.Sleep(1 * time.Millisecond) + + // Determine the deadlines relative to the initial clock value. + initialTime := mocks.clock.Now().UTC() + deadlines := []time.Time{ + initialTime.Add(timeoutInitialSleep - taskTimeout), + initialTime.Add(timeoutInitialSleep - taskTimeout + 1*timeoutCheckInterval), + initialTime.Add(timeoutInitialSleep - taskTimeout + 2*timeoutCheckInterval), + } + + // Expect three fetches, one after the initial sleep time, and two a regular interval later. + fetchTimes := make([]time.Time, len(deadlines)) + firstCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[0]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[0] = mocks.clock.Now().UTC() + return []*persistence.Task{}, nil + }) + + secondCall := mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[1]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[1] = mocks.clock.Now().UTC() + // Return a database error. This shouldn't break the check loop. + return []*persistence.Task{}, errors.New("testing what errors do") + }). + After(firstCall) + + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, deadlines[2]). + DoAndReturn(func(ctx context.Context, timeout time.Time) ([]*persistence.Task, error) { + fetchTimes[2] = mocks.clock.Now().UTC() + return []*persistence.Task{}, nil + }). + After(secondCall) + + mocks.clock.Add(2 * time.Minute) // Should still be sleeping. + mocks.clock.Add(2 * time.Minute) // Should still be sleeping. + mocks.clock.Add(time.Minute) // Should trigger the first fetch. + mocks.clock.Add(time.Minute) // Should trigger the second fetch. + mocks.clock.Add(time.Minute) // Should trigger the third fetch. + + // Wait for the timeout checker to actually run & hit the expected calls. + time.Sleep(1 * time.Millisecond) + + for idx, fetchTime := range fetchTimes { + // Check for zero values first, because they can be a bit confusing in the assert.Equal() logs. + if !assert.Falsef(t, fetchTime.IsZero(), "fetchTime[%d] should not be zero", idx) { + continue + } + expect := initialTime.Add(timeoutInitialSleep + time.Duration(idx)*timeoutCheckInterval) + assert.Equalf(t, expect, fetchTime, "fetchTime[%d] not as expected", idx) + } +} + +func TestTaskTimeout(t *testing.T) { + ttc, finish, mocks := timeoutCheckerTestFixtures(t) + defer finish() + + mocks.run(ttc) + + // Wait for the timeout checker to actually be sleeping, otherwise it could + // have a different sleep-start time than we expect. + time.Sleep(1 * time.Millisecond) + + lastTime := mocks.clock.Now().UTC().Add(-1 * time.Hour) + + job := persistence.Job{UUID: "JOB-UUID"} + worker := persistence.Worker{ + UUID: "WORKER-UUID", + Name: "Tester", + Model: gorm.Model{ID: 47}, + } + taskUnassigned := persistence.Task{ + UUID: "TASK-UUID-UNASSIGNED", + Job: &job, + LastTouchedAt: lastTime, + } + taskUnknownWorker := persistence.Task{ + UUID: "TASK-UUID-UNKNOWN", + Job: &job, + LastTouchedAt: lastTime, + WorkerID: &worker.ID, + } + taskAssigned := persistence.Task{ + UUID: "TASK-UUID-ASSIGNED", + Job: &job, + LastTouchedAt: lastTime, + WorkerID: &worker.ID, + Worker: &worker, + } + + mocks.persist.EXPECT().FetchTimedOutTasks(mocks.ctx, gomock.Any()). + Return([]*persistence.Task{&taskUnassigned, &taskUnknownWorker, &taskAssigned}, nil) + + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnassigned, api.TaskStatusFailed) + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskUnknownWorker, api.TaskStatusFailed) + mocks.taskStateMachine.EXPECT().TaskStatusChange(mocks.ctx, &taskAssigned, api.TaskStatusFailed) + + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnassigned.UUID, + "Task timed out. It was assigned to worker -unassigned-, but untouched since 1969-12-31T23:00:00Z") + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskUnknownWorker.UUID, + "Task timed out. It was assigned to worker -unknown-, but untouched since 1969-12-31T23:00:00Z") + mocks.logStorage.EXPECT().WriteTimestamped(gomock.Any(), job.UUID, taskAssigned.UUID, + "Task timed out. It was assigned to worker Tester (WORKER-UUID), but untouched since 1969-12-31T23:00:00Z") + + // All the timeouts should be handled after the initial sleep. + mocks.clock.Add(timeoutInitialSleep) +} + +type TimeoutCheckerMocks struct { + clock *clock.Mock + persist *mocks.MockPersistenceService + taskStateMachine *mocks.MockTaskStateMachine + logStorage *mocks.MockLogStorage + + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup +} + +// run starts a goroutine to call ttc.Run(mocks.ctx). +func (mocks *TimeoutCheckerMocks) run(ttc *TimeoutChecker) { + mocks.wg.Add(1) + go func() { + defer mocks.wg.Done() + ttc.Run(mocks.ctx) + }() +} + +func timeoutCheckerTestFixtures(t *testing.T) (*TimeoutChecker, func(), *TimeoutCheckerMocks) { + mockCtrl := gomock.NewController(t) + + mocks := &TimeoutCheckerMocks{ + clock: clock.NewMock(), + persist: mocks.NewMockPersistenceService(mockCtrl), + taskStateMachine: mocks.NewMockTaskStateMachine(mockCtrl), + logStorage: mocks.NewMockLogStorage(mockCtrl), + + wg: new(sync.WaitGroup), + } + + // mockedNow, err := time.Parse(time.RFC3339, "2022-06-09T16:52:04+02:00") + // if err != nil { + // panic(err) + // } + // mocks.clock.Set(mockedNow) + + ctx, cancel := context.WithCancel(context.Background()) + mocks.ctx = ctx + mocks.cancel = cancel + + // This should be called at the end of each unit test. + finish := func() { + mocks.cancel() + mocks.wg.Wait() + mockCtrl.Finish() + } + + sm := New( + taskTimeout, + mocks.clock, + mocks.persist, + mocks.taskStateMachine, + mocks.logStorage, + ) + return sm, finish, mocks +}